Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operations/arguments to local CuPy array benchmark #524

Merged
merged 7 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dask_cuda/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import dask
import dask.dataframe.shuffle
from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_tasks_wrapper

from ._version import get_versions
from .cuda_worker import CUDAWorker
from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_tasks_wrapper
from .local_cuda_cluster import LocalCUDACluster

__version__ = get_versions()["version"]
Expand Down
73 changes: 68 additions & 5 deletions dask_cuda/benchmarks/local_cupy.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ async def _run(client, args):
func_args = (x,)

func = lambda x: np.fft.fft(x, axis=0)
elif args.operation == "sum":
x = rs.random((args.size, args.size), chunks=args.chunk_size).persist()
await wait(x)
func_args = (x,)

func = lambda x: x.sum()
elif args.operation == "mean":
x = rs.random((args.size, args.size), chunks=args.chunk_size).persist()
await wait(x)
func_args = (x,)

func = lambda x: x.mean()
elif args.operation == "slice":
x = rs.random((args.size, args.size), chunks=args.chunk_size).persist()
await wait(x)
func_args = (x,)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

func = lambda x: x[::3].copy()

shape = x.shape
chunksize = x.chunksize
Expand Down Expand Up @@ -179,6 +197,40 @@ async def run(args):
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))

if args.benchmark_json:
import json
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving import json to the top should be fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Also going to change this to from json import dump.


d = {
"operation": args.operation,
"size": args.size,
"second_size": args.second_size,
"chunk_size": args.chunk_size,
"compute_size": size,
"compute_chunk_size": chunksize,
"ignore_size": format_bytes(args.ignore_size),
"protocol": args.protocol,
"devs": args.devs,
"threads_per_worker": args.threads_per_worker,
"times": [
{"wall_clock": took, "npartitions": npartitions}
for (took, npartitions) in took_list
],
"bandwidths": {
f"({d1},{d2})"
if args.multi_node or args.sched_addr
else "(%02d,%02d)"
% (d1, d2): {
"25%": bw[0],
"50%": bw[1],
"75%": bw[2],
"total_nbytes": total_nbytes[(d1, d2)],
}
for (d1, d2), bw in sorted(bandwidths.items())
},
}
with open(args.benchmark_json, "w") as fp:
json.dump(d, fp, indent=2)

# An SSHCluster will not automatically shut down, we have to
# ensure it does.
if args.multi_node:
Expand Down Expand Up @@ -206,29 +258,40 @@ def parse_args():
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Do merge with GPU or CPU dataframes",
"help": "Do merge with GPU or CPU dataframes.",
},
{
"name": ["-o", "--operation",],
"default": "transpose_sum",
"type": str,
"help": "The operation to run, valid options are: "
"'transpose_sum' (default), 'dot', 'fft', 'svd'.",
"'transpose_sum' (default), 'dot', 'fft', 'svd', 'sum', 'mean', 'slice'.",
},
{
"name": ["-c", "--chunk-size",],
"default": "2500",
"type": int,
"help": "Chunk size (default 2500)",
"help": "Chunk size (default 2500).",
},
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
"help": "Ignore messages smaller than this (default '1 MB').",
},
{
"name": "--runs",
"default": 3,
"type": int,
"help": "Number of runs (default 3).",
},
{
"name": "--benchmark-json",
"default": None,
"type": str,
"help": "Dump a JSON report of benchmarks (optional).",
},
{"name": "--runs", "default": 3, "type": int, "help": "Number of runs",},
]

return parse_benchmark_args(
Expand Down