Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
Browse files Browse the repository at this point in the history
…est-practices
  • Loading branch information
rjzamora committed Sep 18, 2024
2 parents eee37f3 + 44a9c10 commit 7c63c7e
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 4 deletions.
87 changes: 85 additions & 2 deletions cpp/benchmarks/io/parquet/parquet_reader_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ constexpr cudf::size_type num_cols = 64;
void parquet_read_common(cudf::size_type num_rows_to_read,
cudf::size_type num_cols_to_read,
cuio_source_sink_pair& source_sink,
nvbench::state& state)
nvbench::state& state,
size_t table_data_size = data_size)
{
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_sink.make_source_info());
Expand All @@ -52,7 +53,7 @@ void parquet_read_common(cudf::size_type num_rows_to_read,
});

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_element_count(static_cast<double>(table_data_size) / time, "bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
Expand Down Expand Up @@ -231,6 +232,70 @@ void BM_parquet_read_chunks(nvbench::state& state, nvbench::type_list<nvbench::e
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
}

template <data_type DataType>
void BM_parquet_read_wide_tables(nvbench::state& state,
nvbench::type_list<nvbench::enum_type<DataType>> type_list)
{
auto const d_type = get_type_or_group(static_cast<int32_t>(DataType));

auto const n_col = static_cast<cudf::size_type>(state.get_int64("num_cols"));
auto const data_size_bytes = static_cast<size_t>(state.get_int64("data_size_mb") << 20);
auto const cardinality = static_cast<cudf::size_type>(state.get_int64("cardinality"));
auto const run_length = static_cast<cudf::size_type>(state.get_int64("run_length"));
auto const source_type = io_type::DEVICE_BUFFER;
cuio_source_sink_pair source_sink(source_type);

auto const num_rows_written = [&]() {
auto const tbl = create_random_table(
cycle_dtypes(d_type, n_col),
table_size_bytes{data_size_bytes},
data_profile_builder().cardinality(cardinality).avg_run_length(run_length));
auto const view = tbl->view();

cudf::io::parquet_writer_options write_opts =
cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view)
.compression(cudf::io::compression_type::NONE);
cudf::io::write_parquet(write_opts);
return view.num_rows();
}();

parquet_read_common(num_rows_written, n_col, source_sink, state, data_size_bytes);
}

void BM_parquet_read_wide_tables_mixed(nvbench::state& state)
{
auto const d_type = []() {
auto d_type1 = get_type_or_group(static_cast<int32_t>(data_type::INTEGRAL));
auto d_type2 = get_type_or_group(static_cast<int32_t>(data_type::FLOAT));
d_type1.reserve(d_type1.size() + d_type2.size());
std::move(d_type2.begin(), d_type2.end(), std::back_inserter(d_type1));
return d_type1;
}();

auto const n_col = static_cast<cudf::size_type>(state.get_int64("num_cols"));
auto const data_size_bytes = static_cast<size_t>(state.get_int64("data_size_mb") << 20);
auto const cardinality = static_cast<cudf::size_type>(state.get_int64("cardinality"));
auto const run_length = static_cast<cudf::size_type>(state.get_int64("run_length"));
auto const source_type = io_type::DEVICE_BUFFER;
cuio_source_sink_pair source_sink(source_type);

auto const num_rows_written = [&]() {
auto const tbl = create_random_table(
cycle_dtypes(d_type, n_col),
table_size_bytes{data_size_bytes},
data_profile_builder().cardinality(cardinality).avg_run_length(run_length));
auto const view = tbl->view();

cudf::io::parquet_writer_options write_opts =
cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view)
.compression(cudf::io::compression_type::NONE);
cudf::io::write_parquet(write_opts);
return view.num_rows();
}();

parquet_read_common(num_rows_written, n_col, source_sink, state, data_size_bytes);
}

using d_type_list = nvbench::enum_type_list<data_type::INTEGRAL,
data_type::FLOAT,
data_type::DECIMAL,
Expand Down Expand Up @@ -272,6 +337,24 @@ NVBENCH_BENCH(BM_parquet_read_io_small_mixed)
.add_int64_axis("run_length", {1, 32})
.add_int64_axis("num_string_cols", {1, 2, 3});

using d_type_list_wide_table = nvbench::enum_type_list<data_type::DECIMAL, data_type::STRING>;
NVBENCH_BENCH_TYPES(BM_parquet_read_wide_tables, NVBENCH_TYPE_AXES(d_type_list_wide_table))
.set_name("parquet_read_wide_tables")
.set_min_samples(4)
.set_type_axes_names({"data_type"})
.add_int64_axis("data_size_mb", {1024, 2048, 4096})
.add_int64_axis("num_cols", {256, 512, 1024})
.add_int64_axis("cardinality", {0, 1000})
.add_int64_axis("run_length", {1, 32});

NVBENCH_BENCH(BM_parquet_read_wide_tables_mixed)
.set_name("parquet_read_wide_tables_mixed")
.set_min_samples(4)
.add_int64_axis("data_size_mb", {1024, 2048, 4096})
.add_int64_axis("num_cols", {256, 512, 1024})
.add_int64_axis("cardinality", {0, 1000})
.add_int64_axis("run_length", {1, 32});

// a benchmark for structs that only contain fixed-width types
using d_type_list_struct_only = nvbench::enum_type_list<data_type::STRUCT>;
NVBENCH_BENCH_TYPES(BM_parquet_read_fixed_width_struct, NVBENCH_TYPE_AXES(d_type_list_struct_only))
Expand Down
20 changes: 20 additions & 0 deletions docs/cudf/source/cudf_pandas/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,23 @@ To profile a script being run from the command line, pass the
```bash
python -m cudf.pandas --profile script.py
```

### cudf.pandas CLI Features

Several of the ways to provide input to the `python` interpreter also work with `python -m cudf.pandas`, such as the REPL, the `-c` flag, and reading from stdin.

Executing `python -m cudf.pandas` with no script name will enter a REPL (read-eval-print loop) similar to the behavior of the normal `python` interpreter.

The `-c` flag accepts a code string to run, like this:

```bash
$ python -m cudf.pandas -c "import pandas; print(pandas)"
<module 'pandas' (ModuleAccelerator(fast=cudf, slow=pandas))>
```

Users can also provide code to execute from stdin, like this:

```bash
$ echo "import pandas; print(pandas)" | python -m cudf.pandas
<module 'pandas' (ModuleAccelerator(fast=cudf, slow=pandas))>
```
36 changes: 34 additions & 2 deletions python/cudf/cudf/pandas/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""

import argparse
import code
import runpy
import sys
import tempfile
Expand All @@ -21,6 +22,8 @@

@contextmanager
def profile(function_profile, line_profile, fn):
if fn is None and (line_profile or function_profile):
raise RuntimeError("Enabling the profiler requires a script name.")
if line_profile:
with open(fn) as f:
lines = f.readlines()
Expand Down Expand Up @@ -54,6 +57,11 @@ def main():
dest="module",
nargs=1,
)
parser.add_argument(
"-c",
dest="cmd",
nargs=1,
)
parser.add_argument(
"--profile",
action="store_true",
Expand All @@ -72,9 +80,18 @@ def main():

args = parser.parse_args()

if args.cmd:
f = tempfile.NamedTemporaryFile(mode="w+b", suffix=".py")
f.write(args.cmd[0].encode())
f.seek(0)
args.args.insert(0, f.name)

install()
with profile(args.profile, args.line_profile, args.args[0]) as fn:
args.args[0] = fn

script_name = args.args[0] if len(args.args) > 0 else None
with profile(args.profile, args.line_profile, script_name) as fn:
if script_name is not None:
args.args[0] = fn
if args.module:
(module,) = args.module
# run the module passing the remaining arguments
Expand All @@ -85,6 +102,21 @@ def main():
# Remove ourself from argv and continue
sys.argv[:] = args.args
runpy.run_path(args.args[0], run_name="__main__")
else:
if sys.stdin.isatty():
banner = f"Python {sys.version} on {sys.platform}"
site_import = not sys.flags.no_site
if site_import:
cprt = 'Type "help", "copyright", "credits" or "license" for more information.'
banner += "\n" + cprt
else:
# Don't show prompts or banners if stdin is not a TTY
sys.ps1 = ""
sys.ps2 = ""
banner = ""

# Launch an interactive interpreter
code.interact(banner=banner, exitmsg="")


if __name__ == "__main__":
Expand Down
100 changes: 100 additions & 0 deletions python/cudf/cudf_pandas_tests/test_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import subprocess
import tempfile
import textwrap


def _run_python(*, cudf_pandas, command):
executable = "python "
if cudf_pandas:
executable += "-m cudf.pandas "
return subprocess.run(
executable + command,
shell=True,
capture_output=True,
check=True,
text=True,
)


def test_run_cudf_pandas_with_script():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=True) as f:
code = textwrap.dedent(
"""
import pandas as pd
df = pd.DataFrame({'a': [1, 2, 3]})
print(df['a'].sum())
"""
)
f.write(code)
f.flush()

res = _run_python(cudf_pandas=True, command=f.name)
expect = _run_python(cudf_pandas=False, command=f.name)

assert res.stdout != ""
assert res.stdout == expect.stdout


def test_run_cudf_pandas_with_script_with_cmd_args():
input_args_and_code = """-c 'import pandas as pd; df = pd.DataFrame({"a": [1, 2, 3]}); print(df["a"].sum())'"""

res = _run_python(cudf_pandas=True, command=input_args_and_code)
expect = _run_python(cudf_pandas=False, command=input_args_and_code)

assert res.stdout != ""
assert res.stdout == expect.stdout


def test_run_cudf_pandas_with_script_with_cmd_args_check_cudf():
"""Verify that cudf is active with -m cudf.pandas."""
input_args_and_code = """-c 'import pandas as pd; print(pd)'"""

res = _run_python(cudf_pandas=True, command=input_args_and_code)
expect = _run_python(cudf_pandas=False, command=input_args_and_code)

assert "cudf" in res.stdout
assert "cudf" not in expect.stdout


def test_cudf_pandas_script_repl():
def start_repl_process(cmd):
return subprocess.Popen(
cmd.split(),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)

def get_repl_output(process, commands):
for command in commands:
process.stdin.write(command)
process.stdin.flush()
return process.communicate()

p1 = start_repl_process("python -m cudf.pandas")
p2 = start_repl_process("python")
commands = [
"import pandas as pd\n",
"print(pd.Series(range(2)).sum())\n",
"print(pd.Series(range(5)).sum())\n",
"import sys\n",
"print(pd.Series(list('abcd')), out=sys.stderr)\n",
]

res = get_repl_output(p1, commands)
expect = get_repl_output(p2, commands)

# Check stdout
assert res[0] != ""
assert res[0] == expect[0]

# Check stderr
assert res[1] != ""
assert res[1] == expect[1]

p1.kill()
p2.kill()

0 comments on commit 7c63c7e

Please sign in to comment.