From fcc0e04a7c21f163df04d4eea49272e194146dd8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 21 Mar 2024 08:58:57 -0700 Subject: [PATCH] remove _make_collection - isn't needed --- dask_cuda/benchmarks/local_cudf_merge.py | 41 ++++++++++--------- dask_cuda/benchmarks/local_cudf_shuffle.py | 19 +++++---- dask_cuda/explicit_comms/dataframe/shuffle.py | 20 +++++---- dask_cuda/utils.py | 16 -------- 4 files changed, 44 insertions(+), 52 deletions(-) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 154b70c74..4095dd392 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -7,7 +7,7 @@ import pandas as pd import dask -from dask.base import tokenize +import dask.dataframe as dd from dask.distributed import performance_report, wait from dask.utils import format_bytes, parse_bytes @@ -19,7 +19,6 @@ print_separator, print_throughput_bandwidth, ) -from dask_cuda.utils import _make_collection # Benchmarking cuDF merge operation based on # @@ -30,7 +29,9 @@ dask.config.set({"dataframe.shuffle.method": "tasks"}) -def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): +def generate_chunk(input): + i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input + # Setting a seed that triggers max amount of comm in the two-GPU case. if gpu: import cupy as xp @@ -111,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): parts = [chunk_size for _ in range(num_chunks)] device_type = True if args.type == "gpu" else False - meta = generate_chunk(0, 4, 1, chunk_type, None, device_type) + meta = generate_chunk((0, 4, 1, chunk_type, None, device_type)) divisions = [None] * (len(parts) + 1) - name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type) - - graph = { - (name, i): ( - generate_chunk, - i, - part, - len(parts), - chunk_type, - frac_match, - device_type, - ) - for i, part in enumerate(parts) - } - - ddf = _make_collection(graph, name, meta, divisions) + ddf = dd.from_map( + generate_chunk, + [ + ( + i, + part, + len(parts), + chunk_type, + frac_match, + device_type, + ) + for i, part in enumerate(parts) + ], + meta=meta, + divisions=divisions, + ) if chunk_type == "build": if not args.no_shuffle: diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index 249137b12..6899243e1 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -20,7 +20,6 @@ print_separator, print_throughput_bandwidth, ) -from dask_cuda.utils import _make_collection try: import cupy @@ -93,18 +92,24 @@ def create_data( ) # Create partition based to the specified partition distribution - dsk = {} + futures = [] for i, part_size in enumerate(dist): for _ in range(part_size): # We use `client.submit` to control placement of the partition. - dsk[(name, len(dsk))] = client.submit( - create_df, chunksize, args.type, workers=[workers[i]], pure=False + futures.append( + client.submit( + create_df, chunksize, args.type, workers=[workers[i]], pure=False + ) ) - wait(dsk.values()) + wait(futures) df_meta = create_df(0, args.type) - divs = [None] * (len(dsk) + 1) - ret = _make_collection(dsk, name, df_meta, divs).persist() + divs = [None] * (len(futures) + 1) + ret = dask.dataframe.from_delayed( + futures, + meta=df_meta, + divisions=divs, + ).persist() wait(ret) data_processed = args.in_parts * args.partition_size diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 0e6a2f96c..71e47f5b7 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -529,23 +529,25 @@ def shuffle( # TODO: can we do this without using `submit()` to avoid the overhead # of creating a Future for each dataframe partition? - dsk = {} + futures = [] for rank in ranks: for part_id in rank_to_out_part_ids[rank]: - dsk[(name, part_id)] = c.client.submit( - getitem, - shuffle_result[rank], - part_id, - workers=[c.worker_addresses[rank]], + futures.append( + c.client.submit( + getitem, + shuffle_result[rank], + part_id, + workers=[c.worker_addresses[rank]], + ) ) # Create a distributed Dataframe from all the pieces - divs = [None] * (len(dsk) + 1) - ret = dd.from_delayed(dsk.values(), meta=df_meta, divisions=divs).persist() + divs = [None] * (len(futures) + 1) + ret = dd.from_delayed(futures, meta=df_meta, divisions=divs).persist() wait([ret]) # Release all temporary dataframes - for fut in [*shuffle_result.values(), *dsk.values()]: + for fut in [*shuffle_result.values(), *futures]: fut.release() return ret diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index ffd549e0f..ff4dbbae3 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -764,19 +764,3 @@ def get_rmm_memory_resource_stack(mr) -> list: if isinstance(mr, rmm.mr.StatisticsResourceAdaptor): return mr.allocation_counts["current_bytes"] return None - - -def _make_collection(graph, name, meta, divisions): - # Create a DataFrame collection from a task graph. - # Accounts for legacy vs dask-expr API - try: - # New expression-based API - from dask.dataframe import from_graph - - keys = [(name, i) for i in range(len(divisions))] - return from_graph(graph, meta, divisions, keys, name) - except ImportError: - # Legacy API - from dask.dataframe.core import new_dd_object - - return new_dd_object(graph, name, meta, divisions)