diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index f7ee2361e..4228d21e3 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -1,8 +1,9 @@ import dask import dask.dataframe.shuffle -from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_tasks_wrapper + from ._version import get_versions from .cuda_worker import CUDAWorker +from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_tasks_wrapper from .local_cuda_cluster import LocalCUDACluster __version__ = get_versions()["version"] diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index 382721eb4..c8259cecb 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -1,5 +1,6 @@ import asyncio from collections import defaultdict +from json import dump from time import perf_counter as clock from warnings import filterwarnings @@ -60,6 +61,24 @@ async def _run(client, args): func_args = (x,) func = lambda x: np.fft.fft(x, axis=0) + elif args.operation == "sum": + x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() + await wait(x) + func_args = (x,) + + func = lambda x: x.sum() + elif args.operation == "mean": + x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() + await wait(x) + func_args = (x,) + + func = lambda x: x.mean() + elif args.operation == "slice": + x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() + await wait(x) + func_args = (x,) + + func = lambda x: x[::3].copy() shape = x.shape chunksize = x.chunksize @@ -179,6 +198,39 @@ async def run(args): ) print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)])) + if args.benchmark_json: + + d = { + "operation": args.operation, + "size": args.size, + "second_size": args.second_size, + "chunk_size": args.chunk_size, + "compute_size": size, + "compute_chunk_size": chunksize, + "ignore_size": format_bytes(args.ignore_size), + "protocol": args.protocol, + "devs": args.devs, + "threads_per_worker": args.threads_per_worker, + "times": [ + {"wall_clock": took, "npartitions": npartitions} + for (took, npartitions) in took_list + ], + "bandwidths": { + f"({d1},{d2})" + if args.multi_node or args.sched_addr + else "(%02d,%02d)" + % (d1, d2): { + "25%": bw[0], + "50%": bw[1], + "75%": bw[2], + "total_nbytes": total_nbytes[(d1, d2)], + } + for (d1, d2), bw in sorted(bandwidths.items()) + }, + } + with open(args.benchmark_json, "w") as fp: + dump(d, fp, indent=2) + # An SSHCluster will not automatically shut down, we have to # ensure it does. if args.multi_node: @@ -206,29 +258,40 @@ def parse_args(): "choices": ["cpu", "gpu"], "default": "gpu", "type": str, - "help": "Do merge with GPU or CPU dataframes", + "help": "Do merge with GPU or CPU dataframes.", }, { "name": ["-o", "--operation",], "default": "transpose_sum", "type": str, "help": "The operation to run, valid options are: " - "'transpose_sum' (default), 'dot', 'fft', 'svd'.", + "'transpose_sum' (default), 'dot', 'fft', 'svd', 'sum', 'mean', 'slice'.", }, { "name": ["-c", "--chunk-size",], "default": "2500", "type": int, - "help": "Chunk size (default 2500)", + "help": "Chunk size (default 2500).", }, { "name": "--ignore-size", "default": "1 MiB", "metavar": "nbytes", "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", + "help": "Ignore messages smaller than this (default '1 MB').", + }, + { + "name": "--runs", + "default": 3, + "type": int, + "help": "Number of runs (default 3).", + }, + { + "name": "--benchmark-json", + "default": None, + "type": str, + "help": "Dump a JSON report of benchmarks (optional).", }, - {"name": "--runs", "default": 3, "type": int, "help": "Number of runs",}, ] return parse_benchmark_args(