From 015a0eaec1848f420ce66ab9742990c0df911d40 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 11 Feb 2021 08:27:51 -0800 Subject: [PATCH 1/7] Add cross product, sum, mean operations --- dask_cuda/benchmarks/local_cupy.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index 382721eb4..79ede7a71 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -41,6 +41,15 @@ async def _run(client, args): func_args = (x, y) func = lambda x, y: x.dot(y) + elif args.operation == "cross": + x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() + y = rs.random((args.size, args.size), chunks=args.chunk_size).persist() + await wait(x) + await wait(y) + + func_args = (x, y) + + func = lambda x, y: x.cross(y) elif args.operation == "svd": x = rs.random( (args.size, args.second_size), @@ -60,6 +69,18 @@ async def _run(client, args): func_args = (x,) func = lambda x: np.fft.fft(x, axis=0) + elif args.operation == "sum": + x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() + await wait(x) + func_args = (x,) + + func = lambda x: x.sum() + elif args.operation == "mean": + x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() + await wait(x) + func_args = (x,) + + func = lambda x: x.mean() shape = x.shape chunksize = x.chunksize @@ -213,7 +234,7 @@ def parse_args(): "default": "transpose_sum", "type": str, "help": "The operation to run, valid options are: " - "'transpose_sum' (default), 'dot', 'fft', 'svd'.", + "'transpose_sum' (default), 'dot', 'cross', 'fft', 'svd', 'sum', 'mean'.", }, { "name": ["-c", "--chunk-size",], From 84703ecbce21d10dca63cbbbd9401951fa1a4cd1 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 11 Feb 2021 10:11:49 -0800 Subject: [PATCH 2/7] Resorting packages --- dask_cuda/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index f7ee2361e..4228d21e3 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -1,8 +1,9 @@ import dask import dask.dataframe.shuffle -from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_tasks_wrapper + from ._version import get_versions from .cuda_worker import CUDAWorker +from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_tasks_wrapper from .local_cuda_cluster import LocalCUDACluster __version__ = get_versions()["version"] From c68fed9bd770fd208086e915204b84e389fc3f70 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 11 Feb 2021 10:16:18 -0800 Subject: [PATCH 3/7] Add option to output JSON --- dask_cuda/benchmarks/local_cupy.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index 79ede7a71..da0a7082d 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -200,6 +200,26 @@ async def run(args): ) print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)])) + if args.benchmark_json: + import json + + d = { + "operation": args.operation, + "size": args.size, + "second_size": args.second_size, + "chunk_size": args.chunk_size, + "compute_size": size, + "compute_chunk_size": chunksize, + "ignore_size": format_bytes(args.ignore_size), + "protocol": args.protocol, + "devs": args.devs, + "threads_per_worker": args.threads_per_worker, + "times": took_list, + "bandwiths": sorted(bandwidths.items()), + } + with open(args.benchmark_json, "w") as fp: + json.dump(d, fp, indent=2) + # An SSHCluster will not automatically shut down, we have to # ensure it does. if args.multi_node: @@ -250,6 +270,12 @@ def parse_args(): "help": "Ignore messages smaller than this (default '1 MB')", }, {"name": "--runs", "default": 3, "type": int, "help": "Number of runs",}, + { + "name": "--benchmark-json", + "default": None, + "type": str, + "help": "Dump a JSON report of benchmarks (optional).", + }, ] return parse_benchmark_args( From 26ed1ce27c93d45e48e5173d940456b274dca526 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 11 Feb 2021 10:57:14 -0800 Subject: [PATCH 4/7] Add slice operation --- dask_cuda/benchmarks/local_cupy.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index da0a7082d..7cf0f664f 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -81,6 +81,12 @@ async def _run(client, args): func_args = (x,) func = lambda x: x.mean() + elif args.operation == "slice": + x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() + await wait(x) + func_args = (x,) + + func = lambda x: x[::3].copy() shape = x.shape chunksize = x.chunksize @@ -254,7 +260,7 @@ def parse_args(): "default": "transpose_sum", "type": str, "help": "The operation to run, valid options are: " - "'transpose_sum' (default), 'dot', 'cross', 'fft', 'svd', 'sum', 'mean'.", + "'transpose_sum' (default), 'dot', 'cross', 'fft', 'svd', 'sum', 'mean', 'slice'.", }, { "name": ["-c", "--chunk-size",], From 58c2cc96beabf41b9869885e851db629154c102d Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 11 Feb 2021 12:23:03 -0800 Subject: [PATCH 5/7] Remove nonexistent cross operation --- dask_cuda/benchmarks/local_cupy.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index 7cf0f664f..a4219fd8c 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -41,15 +41,6 @@ async def _run(client, args): func_args = (x, y) func = lambda x, y: x.dot(y) - elif args.operation == "cross": - x = rs.random((args.size, args.size), chunks=args.chunk_size).persist() - y = rs.random((args.size, args.size), chunks=args.chunk_size).persist() - await wait(x) - await wait(y) - - func_args = (x, y) - - func = lambda x, y: x.cross(y) elif args.operation == "svd": x = rs.random( (args.size, args.second_size), @@ -260,7 +251,7 @@ def parse_args(): "default": "transpose_sum", "type": str, "help": "The operation to run, valid options are: " - "'transpose_sum' (default), 'dot', 'cross', 'fft', 'svd', 'sum', 'mean', 'slice'.", + "'transpose_sum' (default), 'dot', 'fft', 'svd', 'sum', 'mean', 'slice'.", }, { "name": ["-c", "--chunk-size",], From ff03dec51632f498f3bd535589d3ac4f0e575c9a Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 11 Feb 2021 16:55:29 -0800 Subject: [PATCH 6/7] Clean up JSON output --- dask_cuda/benchmarks/local_cupy.py | 31 ++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index a4219fd8c..e56393a35 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -211,8 +211,22 @@ async def run(args): "protocol": args.protocol, "devs": args.devs, "threads_per_worker": args.threads_per_worker, - "times": took_list, - "bandwiths": sorted(bandwidths.items()), + "times": [ + {"wall_clock": took, "npartitions": npartitions} + for (took, npartitions) in took_list + ], + "bandwidths": { + f"({d1},{d2})" + if args.multi_node or args.sched_addr + else "(%02d,%02d)" + % (d1, d2): { + "25%": bw[0], + "50%": bw[1], + "75%": bw[2], + "total_nbytes": total_nbytes[(d1, d2)], + } + for (d1, d2), bw in sorted(bandwidths.items()) + }, } with open(args.benchmark_json, "w") as fp: json.dump(d, fp, indent=2) @@ -244,7 +258,7 @@ def parse_args(): "choices": ["cpu", "gpu"], "default": "gpu", "type": str, - "help": "Do merge with GPU or CPU dataframes", + "help": "Do merge with GPU or CPU dataframes.", }, { "name": ["-o", "--operation",], @@ -257,16 +271,21 @@ def parse_args(): "name": ["-c", "--chunk-size",], "default": "2500", "type": int, - "help": "Chunk size (default 2500)", + "help": "Chunk size (default 2500).", }, { "name": "--ignore-size", "default": "1 MiB", "metavar": "nbytes", "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", + "help": "Ignore messages smaller than this (default '1 MB').", + }, + { + "name": "--runs", + "default": 3, + "type": int, + "help": "Number of runs (default 3).", }, - {"name": "--runs", "default": 3, "type": int, "help": "Number of runs",}, { "name": "--benchmark-json", "default": None, From b1b45e573653c6789b6a0601e17193c3f2a27f37 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 12 Feb 2021 07:33:38 -0800 Subject: [PATCH 7/7] Move json import to top of file --- dask_cuda/benchmarks/local_cupy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index e56393a35..c8259cecb 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -1,5 +1,6 @@ import asyncio from collections import defaultdict +from json import dump from time import perf_counter as clock from warnings import filterwarnings @@ -198,7 +199,6 @@ async def run(args): print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)])) if args.benchmark_json: - import json d = { "operation": args.operation, @@ -229,7 +229,7 @@ async def run(args): }, } with open(args.benchmark_json, "w") as fp: - json.dump(d, fp, indent=2) + dump(d, fp, indent=2) # An SSHCluster will not automatically shut down, we have to # ensure it does.