-
Notifications
You must be signed in to change notification settings - Fork 94
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Benchmark] Add parquet read benchmark (#1371)
Adds new benchmark for parquet read performance using a `LocalCUDACluster`. The user can pass in `--key` and `--secret` options to specify S3 credentials. E.g. ``` $ python ./local_read_parquet.py --devs 0,1,2,3,4,5,6,7 --filesystem fsspec --type gpu --file-count 48 --aggregate-files Parquet read benchmark -------------------------------------------------------------------------------- Path | s3://dask-cudf-parquet-testing/dedup_parquet Columns | None Backend | cudf Filesystem | fsspec Blocksize | 244.14 MiB Aggregate files | True Row count | 372066 Size on disk | 1.03 GiB Number of workers | 8 ================================================================================ Wall clock | Throughput -------------------------------------------------------------------------------- 36.75 s | 28.78 MiB/s 21.29 s | 49.67 MiB/s 17.91 s | 59.05 MiB/s ================================================================================ Throughput | 41.77 MiB/s +/- 7.81 MiB/s Bandwidth | 0 B/s +/- 0 B/s Wall clock | 25.32 s +/- 8.20 s ================================================================================ ... ``` **Notes**: - S3 Performance generally scales with the number of workers (multiplied the number of threads per worker) - The example shown above was not executed from an EC2 instance - The example shown above *should* perform better after rapidsai/cudf#16657 - Using `--filesystem arrow` together with `--type gpu` performs well, but depends on rapidsai/cudf#16684 Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) - Peter Andreas Entschev (https://github.com/pentschev) URL: #1371
- Loading branch information
Showing
7 changed files
with
279 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,268 @@ | ||
import contextlib | ||
from collections import ChainMap | ||
from time import perf_counter as clock | ||
|
||
import fsspec | ||
import pandas as pd | ||
|
||
import dask | ||
import dask.dataframe as dd | ||
from dask.base import tokenize | ||
from dask.distributed import performance_report | ||
from dask.utils import format_bytes, parse_bytes | ||
|
||
from dask_cuda.benchmarks.common import Config, execute_benchmark | ||
from dask_cuda.benchmarks.utils import ( | ||
parse_benchmark_args, | ||
print_key_value, | ||
print_separator, | ||
print_throughput_bandwidth, | ||
) | ||
|
||
DISK_SIZE_CACHE = {} | ||
OPTIONS_CACHE = {} | ||
|
||
|
||
def _noop(df): | ||
return df | ||
|
||
|
||
def read_data(paths, columns, backend, **kwargs): | ||
with dask.config.set({"dataframe.backend": backend}): | ||
return dd.read_parquet( | ||
paths, | ||
columns=columns, | ||
**kwargs, | ||
) | ||
|
||
|
||
def get_fs_paths_kwargs(args): | ||
kwargs = {} | ||
|
||
storage_options = {} | ||
if args.key: | ||
storage_options["key"] = args.key | ||
if args.secret: | ||
storage_options["secret"] = args.secret | ||
|
||
if args.filesystem == "arrow": | ||
import pyarrow.fs as pa_fs | ||
from fsspec.implementations.arrow import ArrowFSWrapper | ||
|
||
_mapping = { | ||
"key": "access_key", | ||
"secret": "secret_key", | ||
} # See: pyarrow.fs.S3FileSystem docs | ||
s3_args = {} | ||
for k, v in storage_options.items(): | ||
s3_args[_mapping[k]] = v | ||
|
||
fs = pa_fs.FileSystem.from_uri(args.path)[0] | ||
try: | ||
region = {"region": fs.region} | ||
except AttributeError: | ||
region = {} | ||
kwargs["filesystem"] = type(fs)(**region, **s3_args) | ||
fsspec_fs = ArrowFSWrapper(kwargs["filesystem"]) | ||
|
||
if args.type == "gpu": | ||
kwargs["blocksize"] = args.blocksize | ||
else: | ||
fsspec_fs = fsspec.core.get_fs_token_paths( | ||
args.path, mode="rb", storage_options=storage_options | ||
)[0] | ||
kwargs["filesystem"] = fsspec_fs | ||
kwargs["blocksize"] = args.blocksize | ||
kwargs["aggregate_files"] = args.aggregate_files | ||
|
||
# Collect list of paths | ||
stripped_url_path = fsspec_fs._strip_protocol(args.path) | ||
if stripped_url_path.endswith("/"): | ||
stripped_url_path = stripped_url_path[:-1] | ||
paths = fsspec_fs.glob(f"{stripped_url_path}/*.parquet") | ||
if args.file_count: | ||
paths = paths[: args.file_count] | ||
|
||
return fsspec_fs, paths, kwargs | ||
|
||
|
||
def bench_once(client, args, write_profile=None): | ||
global OPTIONS_CACHE | ||
global DISK_SIZE_CACHE | ||
|
||
# Construct kwargs | ||
token = tokenize(args) | ||
try: | ||
fsspec_fs, paths, kwargs = OPTIONS_CACHE[token] | ||
except KeyError: | ||
fsspec_fs, paths, kwargs = get_fs_paths_kwargs(args) | ||
OPTIONS_CACHE[token] = (fsspec_fs, paths, kwargs) | ||
|
||
if write_profile is None: | ||
ctx = contextlib.nullcontext() | ||
else: | ||
ctx = performance_report(filename=args.profile) | ||
|
||
with ctx: | ||
t1 = clock() | ||
df = read_data( | ||
paths, | ||
columns=args.columns, | ||
backend="cudf" if args.type == "gpu" else "pandas", | ||
**kwargs, | ||
) | ||
num_rows = len( | ||
# Use opaque `map_partitions` call to "block" | ||
# dask-expr from using pq metadata to get length | ||
df.map_partitions( | ||
_noop, | ||
meta=df._meta, | ||
enforce_metadata=False, | ||
) | ||
) | ||
t2 = clock() | ||
|
||
# Extract total size of files on disk | ||
token = tokenize(paths) | ||
try: | ||
disk_size = DISK_SIZE_CACHE[token] | ||
except KeyError: | ||
disk_size = sum(fsspec_fs.sizes(paths)) | ||
DISK_SIZE_CACHE[token] = disk_size | ||
|
||
return (disk_size, num_rows, t2 - t1) | ||
|
||
|
||
def pretty_print_results(args, address_to_index, p2p_bw, results): | ||
if args.markdown: | ||
print("```") | ||
print("Parquet read benchmark") | ||
data_processed, row_count, durations = zip(*results) | ||
print_separator(separator="-") | ||
backend = "cudf" if args.type == "gpu" else "pandas" | ||
print_key_value(key="Path", value=args.path) | ||
print_key_value(key="Columns", value=f"{args.columns}") | ||
print_key_value(key="Backend", value=f"{backend}") | ||
print_key_value(key="Filesystem", value=f"{args.filesystem}") | ||
print_key_value(key="Blocksize", value=f"{format_bytes(args.blocksize)}") | ||
print_key_value(key="Aggregate files", value=f"{args.aggregate_files}") | ||
print_key_value(key="Row count", value=f"{row_count[0]}") | ||
print_key_value(key="Size on disk", value=f"{format_bytes(data_processed[0])}") | ||
if args.markdown: | ||
print("\n```") | ||
args.no_show_p2p_bandwidth = True | ||
print_throughput_bandwidth( | ||
args, durations, data_processed, p2p_bw, address_to_index | ||
) | ||
print_separator(separator="=") | ||
|
||
|
||
def create_tidy_results(args, p2p_bw, results): | ||
configuration = { | ||
"path": args.path, | ||
"columns": args.columns, | ||
"backend": "cudf" if args.type == "gpu" else "pandas", | ||
"filesystem": args.filesystem, | ||
"blocksize": args.blocksize, | ||
"aggregate_files": args.aggregate_files, | ||
} | ||
timing_data = pd.DataFrame( | ||
[ | ||
pd.Series( | ||
data=ChainMap( | ||
configuration, | ||
{ | ||
"wallclock": duration, | ||
"data_processed": data_processed, | ||
"num_rows": num_rows, | ||
}, | ||
) | ||
) | ||
for data_processed, num_rows, duration in results | ||
] | ||
) | ||
return timing_data, p2p_bw | ||
|
||
|
||
def parse_args(): | ||
special_args = [ | ||
{ | ||
"name": "path", | ||
"type": str, | ||
"help": "Parquet directory to read from (must be a flat directory).", | ||
}, | ||
{ | ||
"name": "--blocksize", | ||
"default": "256MB", | ||
"type": parse_bytes, | ||
"help": "How to set the blocksize option", | ||
}, | ||
{ | ||
"name": "--aggregate-files", | ||
"default": False, | ||
"action": "store_true", | ||
"help": "How to set the aggregate_files option", | ||
}, | ||
{ | ||
"name": "--file-count", | ||
"type": int, | ||
"help": "Maximum number of files to read.", | ||
}, | ||
{ | ||
"name": "--columns", | ||
"type": str, | ||
"help": "Columns to read/select from data.", | ||
}, | ||
{ | ||
"name": "--key", | ||
"type": str, | ||
"help": "Public S3 key.", | ||
}, | ||
{ | ||
"name": "--secret", | ||
"type": str, | ||
"help": "Secret S3 key.", | ||
}, | ||
{ | ||
"name": [ | ||
"-t", | ||
"--type", | ||
], | ||
"choices": ["cpu", "gpu"], | ||
"default": "gpu", | ||
"type": str, | ||
"help": "Use GPU or CPU dataframes (default 'gpu')", | ||
}, | ||
{ | ||
"name": "--filesystem", | ||
"choices": ["arrow", "fsspec"], | ||
"default": "fsspec", | ||
"type": str, | ||
"help": "Filesystem backend", | ||
}, | ||
{ | ||
"name": "--runs", | ||
"default": 3, | ||
"type": int, | ||
"help": "Number of runs", | ||
}, | ||
] | ||
|
||
args = parse_benchmark_args( | ||
description="Parquet read benchmark", | ||
args_list=special_args, | ||
check_explicit_comms=False, | ||
) | ||
args.no_show_p2p_bandwidth = True | ||
return args | ||
|
||
|
||
if __name__ == "__main__": | ||
execute_benchmark( | ||
Config( | ||
args=parse_args(), | ||
bench_once=bench_once, | ||
create_tidy_results=create_tidy_results, | ||
pretty_print_results=pretty_print_results, | ||
) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters