Skip to content

Commit

Permalink
remove _make_collection - isn't needed
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Mar 21, 2024
1 parent 1b6ef7d commit fcc0e04
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 52 deletions.
41 changes: 21 additions & 20 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas as pd

import dask
from dask.base import tokenize
import dask.dataframe as dd
from dask.distributed import performance_report, wait
from dask.utils import format_bytes, parse_bytes

Expand All @@ -19,7 +19,6 @@
print_separator,
print_throughput_bandwidth,
)
from dask_cuda.utils import _make_collection

# Benchmarking cuDF merge operation based on
# <https://gist.github.com/rjzamora/0ffc35c19b5180ab04bbf7c793c45955>
Expand All @@ -30,7 +29,9 @@
dask.config.set({"dataframe.shuffle.method": "tasks"})


def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu):
def generate_chunk(input):
i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input

# Setting a seed that triggers max amount of comm in the two-GPU case.
if gpu:
import cupy as xp
Expand Down Expand Up @@ -111,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args):

parts = [chunk_size for _ in range(num_chunks)]
device_type = True if args.type == "gpu" else False
meta = generate_chunk(0, 4, 1, chunk_type, None, device_type)
meta = generate_chunk((0, 4, 1, chunk_type, None, device_type))
divisions = [None] * (len(parts) + 1)

name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type)

graph = {
(name, i): (
generate_chunk,
i,
part,
len(parts),
chunk_type,
frac_match,
device_type,
)
for i, part in enumerate(parts)
}

ddf = _make_collection(graph, name, meta, divisions)
ddf = dd.from_map(
generate_chunk,
[
(
i,
part,
len(parts),
chunk_type,
frac_match,
device_type,
)
for i, part in enumerate(parts)
],
meta=meta,
divisions=divisions,
)

if chunk_type == "build":
if not args.no_shuffle:
Expand Down
19 changes: 12 additions & 7 deletions dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
print_separator,
print_throughput_bandwidth,
)
from dask_cuda.utils import _make_collection

try:
import cupy
Expand Down Expand Up @@ -93,18 +92,24 @@ def create_data(
)

# Create partition based to the specified partition distribution
dsk = {}
futures = []
for i, part_size in enumerate(dist):
for _ in range(part_size):
# We use `client.submit` to control placement of the partition.
dsk[(name, len(dsk))] = client.submit(
create_df, chunksize, args.type, workers=[workers[i]], pure=False
futures.append(
client.submit(
create_df, chunksize, args.type, workers=[workers[i]], pure=False
)
)
wait(dsk.values())
wait(futures)

df_meta = create_df(0, args.type)
divs = [None] * (len(dsk) + 1)
ret = _make_collection(dsk, name, df_meta, divs).persist()
divs = [None] * (len(futures) + 1)
ret = dask.dataframe.from_delayed(
futures,
meta=df_meta,
divisions=divs,
).persist()
wait(ret)

data_processed = args.in_parts * args.partition_size
Expand Down
20 changes: 11 additions & 9 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,23 +529,25 @@ def shuffle(
# TODO: can we do this without using `submit()` to avoid the overhead
# of creating a Future for each dataframe partition?

dsk = {}
futures = []
for rank in ranks:
for part_id in rank_to_out_part_ids[rank]:
dsk[(name, part_id)] = c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
futures.append(
c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
)
)

# Create a distributed Dataframe from all the pieces
divs = [None] * (len(dsk) + 1)
ret = dd.from_delayed(dsk.values(), meta=df_meta, divisions=divs).persist()
divs = [None] * (len(futures) + 1)
ret = dd.from_delayed(futures, meta=df_meta, divisions=divs).persist()
wait([ret])

# Release all temporary dataframes
for fut in [*shuffle_result.values(), *dsk.values()]:
for fut in [*shuffle_result.values(), *futures]:
fut.release()
return ret

Expand Down
16 changes: 0 additions & 16 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,19 +764,3 @@ def get_rmm_memory_resource_stack(mr) -> list:
if isinstance(mr, rmm.mr.StatisticsResourceAdaptor):
return mr.allocation_counts["current_bytes"]
return None


def _make_collection(graph, name, meta, divisions):
# Create a DataFrame collection from a task graph.
# Accounts for legacy vs dask-expr API
try:
# New expression-based API
from dask.dataframe import from_graph

keys = [(name, i) for i in range(len(divisions))]
return from_graph(graph, meta, divisions, keys, name)
except ImportError:
# Legacy API
from dask.dataframe.core import new_dd_object

return new_dd_object(graph, name, meta, divisions)

0 comments on commit fcc0e04

Please sign in to comment.