diff --git a/cpp/examples/billion_rows/CMakeLists.txt b/cpp/examples/billion_rows/CMakeLists.txt new file mode 100644 index 00000000000..d95bb73b258 --- /dev/null +++ b/cpp/examples/billion_rows/CMakeLists.txt @@ -0,0 +1,34 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +cmake_minimum_required(VERSION 3.26.4) + +include(../set_cuda_architecture.cmake) + +# initialize cuda architecture +rapids_cuda_init_architectures(billion_rows) +rapids_cuda_set_architectures(RAPIDS) + +project( + billion_rows + VERSION 0.0.1 + LANGUAGES CXX CUDA +) + +include(../fetch_dependencies.cmake) + +list(APPEND CUDF_CUDA_FLAGS --expt-extended-lambda --expt-relaxed-constexpr) + +add_library(groupby_results OBJECT groupby_results.cpp) +target_link_libraries(groupby_results PRIVATE cudf::cudf) + +add_executable(brc brc.cpp) +target_link_libraries(brc PRIVATE cudf::cudf nvToolsExt $) +install(TARGETS brc DESTINATION bin/examples/libcudf) + +add_executable(brc_chunks brc_chunks.cpp) +target_link_libraries(brc_chunks PRIVATE cudf::cudf nvToolsExt $) +install(TARGETS brc_chunks DESTINATION bin/examples/libcudf) + +add_executable(brc_pipeline brc_pipeline.cpp) +target_link_libraries(brc_pipeline PRIVATE cudf::cudf nvToolsExt $) +install(TARGETS brc_pipeline DESTINATION bin/examples/libcudf) diff --git a/cpp/examples/billion_rows/README.md b/cpp/examples/billion_rows/README.md new file mode 100644 index 00000000000..73ff7aa19f0 --- /dev/null +++ b/cpp/examples/billion_rows/README.md @@ -0,0 +1,44 @@ +# libcudf C++ example for the 1 billion row challenge + +This C++ example demonstrates using libcudf APIs to read and process +a table with 1 billion rows. The 1 billion row challenge is described here: +https://github.com/gunnarmorling/1brc + +The examples load the 1 billion row text file using the CSV reader. +The file contains around 400 unique city names (string type) along with +random temperature values (float type). +Once loaded, the examples performs groupby aggregations to find the +minimum, maximum, and average temperature for each city. + +There are three examples included: +1. `brc.cpp` + Loads the file in one call to the CSV reader. + This generally requires a large amount of available GPU memory. +2. `brc_chunks.cpp` + Loads and processes the file in chunks. + The number of chunks to use is a parameter to the executable. +3. `brc_pipeline.cpp` + Loads and processes the file in chunks with separate threads/streams. + The number of chunks and number of threads to use are parameters to the executable. + +An input file can be generated using the instructions from +https://github.com/gunnarmorling/1brc. + +## Compile and execute + +```bash +# Configure project +cmake -S . -B build/ +# Build +cmake --build build/ --parallel $PARALLEL_LEVEL +# Execute +build/brc input.txt +# Execute in chunked mode with 25 chunks (default) +build/brc_chunks input.txt 25 +# Execute in pipeline mode with 25 chunks and 2 threads (defaults) +build/brc_pipeline input.txt 25 2 +``` + +If your machine does not come with a pre-built libcudf binary, expect the +first build to take some time, as it would build libcudf on the host machine. +It may be sped up by configuring the proper `PARALLEL_LEVEL` number. diff --git a/cpp/examples/billion_rows/brc.cpp b/cpp/examples/billion_rows/brc.cpp new file mode 100644 index 00000000000..b7b292cf16e --- /dev/null +++ b/cpp/examples/billion_rows/brc.cpp @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common.hpp" +#include "groupby_results.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +using elapsed_t = std::chrono::duration; + +int main(int argc, char const** argv) +{ + if (argc < 2) { + std::cout << "required parameter: input-file-path\n"; + return 1; + } + + auto const input_file = std::string{argv[1]}; + std::cout << "Input: " << input_file << std::endl; + + auto const mr_name = std::string("pool"); + auto resource = create_memory_resource(mr_name); + auto stats_mr = + rmm::mr::statistics_resource_adaptor(resource.get()); + rmm::mr::set_current_device_resource(&stats_mr); + auto stream = cudf::get_default_stream(); + + auto start = std::chrono::steady_clock::now(); + + auto const csv_result = [input_file, stream] { + cudf::io::csv_reader_options in_opts = + cudf::io::csv_reader_options::builder(cudf::io::source_info{input_file}) + .header(-1) + .delimiter(';') + .doublequote(false) + .dtypes(std::vector{cudf::data_type{cudf::type_id::STRING}, + cudf::data_type{cudf::type_id::FLOAT32}}) + .na_filter(false); + return cudf::io::read_csv(in_opts, stream).tbl; + }(); + elapsed_t elapsed = std::chrono::steady_clock::now() - start; + std::cout << "File load time: " << elapsed.count() << " seconds\n"; + auto const csv_table = csv_result->view(); + std::cout << "Input rows: " << csv_table.num_rows() << std::endl; + + auto const cities = csv_table.column(0); + auto const temps = csv_table.column(1); + + std::vector> aggregations; + aggregations.emplace_back(cudf::make_min_aggregation()); + aggregations.emplace_back(cudf::make_max_aggregation()); + aggregations.emplace_back(cudf::make_mean_aggregation()); + + auto result = compute_results(cities, temps, std::move(aggregations), stream); + + // The other 2 examples employ sorting for the sub-aggregates so enabling + // the following line may be more comparable in performance with them. + // + // result = cudf::sort_by_key(result->view(), result->view().select({0}), {}, {}, stream); + + stream.synchronize(); + + elapsed = std::chrono::steady_clock::now() - start; + std::cout << "Number of keys: " << result->num_rows() << std::endl; + std::cout << "Process time: " << elapsed.count() << " seconds\n"; + std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n"; + + return 0; +} diff --git a/cpp/examples/billion_rows/brc_chunks.cpp b/cpp/examples/billion_rows/brc_chunks.cpp new file mode 100644 index 00000000000..4a65c59e461 --- /dev/null +++ b/cpp/examples/billion_rows/brc_chunks.cpp @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common.hpp" +#include "groupby_results.hpp" + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +using elapsed_t = std::chrono::duration; + +std::unique_ptr load_chunk(std::string const& input_file, + std::size_t start, + std::size_t size, + rmm::cuda_stream_view stream) +{ + cudf::io::csv_reader_options in_opts = + cudf::io::csv_reader_options::builder(cudf::io::source_info{input_file}) + .header(-1) + .delimiter(';') + .doublequote(false) + .byte_range_offset(start) + .byte_range_size(size) + .dtypes(std::vector{cudf::data_type{cudf::type_id::STRING}, + cudf::data_type{cudf::type_id::FLOAT32}}) + .na_filter(false); + return cudf::io::read_csv(in_opts, stream).tbl; +} + +int main(int argc, char const** argv) +{ + if (argc < 2) { + std::cout << "required parameter: input-file-path\n"; + std::cout << "optional parameter: chunk-count\n"; + return 1; + } + + auto const input_file = std::string{argv[1]}; + auto const divider = (argc < 3) ? 25 : std::stoi(std::string(argv[2])); + + std::cout << "Input: " << input_file << std::endl; + std::cout << "Chunks: " << divider << std::endl; + + auto const mr_name = std::string("pool"); + auto resource = create_memory_resource(mr_name); + auto stats_mr = + rmm::mr::statistics_resource_adaptor(resource.get()); + rmm::mr::set_current_device_resource(&stats_mr); + auto stream = cudf::get_default_stream(); + + std::filesystem::path p = input_file; + auto const file_size = std::filesystem::file_size(p); + + auto start = std::chrono::steady_clock::now(); + + std::vector> agg_data; + std::size_t chunk_size = file_size / divider + ((file_size % divider) != 0); + std::size_t start_pos = 0; + cudf::size_type total_rows = 0; + do { + auto const input_table = load_chunk(input_file, start_pos, chunk_size, stream); + auto const read_rows = input_table->num_rows(); + if (read_rows == 0) break; + + auto const cities = input_table->view().column(0); + auto const temps = input_table->view().column(1); + + std::vector> aggregations; + aggregations.emplace_back(cudf::make_min_aggregation()); + aggregations.emplace_back(cudf::make_max_aggregation()); + aggregations.emplace_back(cudf::make_sum_aggregation()); + aggregations.emplace_back(cudf::make_count_aggregation()); + auto result = compute_results(cities, temps, std::move(aggregations), stream); + + agg_data.emplace_back( + cudf::sort_by_key(result->view(), result->view().select({0}), {}, {}, stream)); + start_pos += chunk_size; + chunk_size = std::min(chunk_size, file_size - start_pos); + total_rows += read_rows; + } while (start_pos < file_size && chunk_size > 0); + + // now aggregate the aggregate results + auto results = compute_final_aggregates(agg_data, stream); + stream.synchronize(); + + elapsed_t elapsed = std::chrono::steady_clock::now() - start; + std::cout << "Number of keys: " << results->num_rows() << std::endl; + std::cout << "Process time: " << elapsed.count() << " seconds\n"; + std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n"; + + return 0; +} diff --git a/cpp/examples/billion_rows/brc_pipeline.cpp b/cpp/examples/billion_rows/brc_pipeline.cpp new file mode 100644 index 00000000000..c65edc163e1 --- /dev/null +++ b/cpp/examples/billion_rows/brc_pipeline.cpp @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common.hpp" +#include "groupby_results.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +using elapsed_t = std::chrono::duration; +using byte_range = std::pair; +using result_t = std::unique_ptr; + +std::unique_ptr load_chunk(std::string const& input_file, + std::size_t start, + std::size_t size, + rmm::cuda_stream_view stream) +{ + cudf::io::csv_reader_options in_opts = + cudf::io::csv_reader_options::builder(cudf::io::source_info{input_file}) + .header(-1) + .delimiter(';') + .doublequote(false) + .byte_range_offset(start) + .byte_range_size(size) + .dtypes(std::vector{cudf::data_type{cudf::type_id::STRING}, + cudf::data_type{cudf::type_id::FLOAT32}}) + .na_filter(false); + return cudf::io::read_csv(in_opts, stream).tbl; +} + +struct chunk_fn { + std::string input_file; + std::vector& agg_data; + rmm::cuda_stream_view stream; + + std::vector byte_ranges{}; + bool first_range{}; + + void add_range(std::size_t start, std::size_t size) + { + byte_ranges.push_back(byte_range{start, size}); + if (!first_range) { first_range = (start == 0); } + } + + void operator()() + { + using namespace std::chrono_literals; + + // process each byte range assigned to this thread + for (auto& br : byte_ranges) { + auto const input_table = load_chunk(input_file, br.first, br.second, stream); + auto const read_rows = input_table->num_rows(); + if (read_rows == 0) continue; + + auto const cities = input_table->view().column(0); + auto const temps = input_table->view().column(1); + + std::vector> aggregations; + aggregations.emplace_back(cudf::make_min_aggregation()); + aggregations.emplace_back(cudf::make_max_aggregation()); + aggregations.emplace_back(cudf::make_sum_aggregation()); + aggregations.emplace_back(cudf::make_count_aggregation()); + auto result = compute_results(cities, temps, std::move(aggregations), stream); + + agg_data.emplace_back( + cudf::sort_by_key(result->view(), result->view().select({0}), {}, {}, stream)); + } + // done with this stream + stream.synchronize_no_throw(); + } +}; + +int main(int argc, char const** argv) +{ + if (argc < 2) { + std::cout << "required parameter: input-file-path\n"; + std::cout << "optional parameters: chunk-count thread-count\n"; + return 1; + } + + auto const input_file = std::string{argv[1]}; + auto const divider = (argc < 3) ? 25 : std::stoi(std::string(argv[2])); + auto const thread_count = (argc < 4) ? 2 : std::stoi(std::string(argv[3])); + + std::cout << "Input: " << input_file << std::endl; + std::cout << "Chunks: " << divider << std::endl; + std::cout << "Threads: " << thread_count << std::endl; + + auto const mr_name = std::string("pool"); + auto resource = create_memory_resource(mr_name); + auto stats_mr = + rmm::mr::statistics_resource_adaptor(resource.get()); + rmm::mr::set_current_device_resource(&stats_mr); + auto stream = cudf::get_default_stream(); + + std::filesystem::path p = input_file; + auto const file_size = std::filesystem::file_size(p); + + auto start = std::chrono::steady_clock::now(); + + std::size_t chunk_size = file_size / divider + ((file_size % divider) != 0); + std::size_t start_pos = 0; + + auto stream_pool = rmm::cuda_stream_pool(thread_count); + std::vector> chunk_results(thread_count); + + std::vector chunk_tasks; + for (auto& cr : chunk_results) { + chunk_tasks.emplace_back(chunk_fn{input_file, cr, stream_pool.get_stream()}); + } + for (std::size_t i = 0; i < divider; ++i) { + auto start = i * chunk_size; + auto size = std::min(chunk_size, file_size - start); + chunk_tasks[i % thread_count].add_range(start, size); + } + std::vector threads; + for (auto& c : chunk_tasks) { + threads.emplace_back(std::thread{c}); + } + for (auto& t : threads) { + t.join(); + } + + // in case some kernels are still running on the default stream + stream.synchronize(); + + // combine each thread's agg data into a single vector + std::vector agg_data(divider); + auto begin = agg_data.begin(); + for (auto& c : chunk_results) { + std::move(c.begin(), c.end(), begin); + begin += c.size(); + } + + // now aggregate the aggregate results + auto results = compute_final_aggregates(agg_data, stream); + stream.synchronize(); + + elapsed_t elapsed = std::chrono::steady_clock::now() - start; + std::cout << "Number of keys: " << results->num_rows() << std::endl; + std::cout << "Process time: " << elapsed.count() << " seconds\n"; + std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n"; + + return 0; +} diff --git a/cpp/examples/billion_rows/common.hpp b/cpp/examples/billion_rows/common.hpp new file mode 100644 index 00000000000..d3063034d28 --- /dev/null +++ b/cpp/examples/billion_rows/common.hpp @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +#include + +/** + * @brief Create CUDA memory resource + */ +auto make_cuda_mr() { return std::make_shared(); } + +/** + * @brief Create a pool device memory resource + */ +auto make_pool_mr() +{ + return rmm::mr::make_owning_wrapper( + make_cuda_mr(), rmm::percent_of_free_device_memory(50)); +} + +/** + * @brief Create memory resource for libcudf functions + */ +std::shared_ptr create_memory_resource(std::string const& name) +{ + if (name == "pool") { return make_pool_mr(); } + return make_cuda_mr(); +} diff --git a/cpp/examples/billion_rows/groupby_results.cpp b/cpp/examples/billion_rows/groupby_results.cpp new file mode 100644 index 00000000000..0a7f24830f6 --- /dev/null +++ b/cpp/examples/billion_rows/groupby_results.cpp @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "groupby_results.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +std::unique_ptr compute_results( + cudf::column_view const& cities, + cudf::column_view const& temperatures, + std::vector>&& aggregations, + rmm::cuda_stream_view stream) +{ + auto groupby_obj = cudf::groupby::groupby(cudf::table_view({cities})); + auto aggregation_reqs = std::vector{}; + auto& req = aggregation_reqs.emplace_back(); + req.values = temperatures; + req.aggregations = std::move(aggregations); + + auto result = groupby_obj.aggregate(aggregation_reqs, stream); + + auto rtn = result.first->release(); + for (auto& r : result.second.front().results) { + rtn.emplace_back(std::move(r)); + } + + return std::make_unique(std::move(rtn)); +} + +std::unique_ptr compute_final_aggregates( + std::vector>& agg_data, rmm::cuda_stream_view stream) +{ + // first combine all the results into a vectors of columns + std::vector min_cols, max_cols, sum_cols, count_cols; + for (auto& tbl : agg_data) { + auto const tv = tbl->view(); + min_cols.push_back(tv.column(1)); + max_cols.push_back(tv.column(2)); + sum_cols.push_back(tv.column(3)); + count_cols.push_back(tv.column(4)); + } + + // Create single columns out of the aggregate table results. + // This relies on every key appearing in every chunk segment. + // All the values for each key become contiguous within the output column. + // For example, for N=min_cols.size() (number of unique cities): + // All of the mins for city[i] are in row[i] of each column of vector min_cols. + // The interleave_columns API transposes these into a single column where + // the first N rows are values for city[0], + // the next N rows are values for city[1], + // ... + // the last N rows are values for city[N-1] + // The final result for each city is computed using segmented_reduce. + auto mins = cudf::interleave_columns(cudf::table_view{min_cols}); + auto maxes = cudf::interleave_columns(cudf::table_view{max_cols}); + auto sums = cudf::interleave_columns(cudf::table_view{sum_cols}); + auto counts = cudf::interleave_columns(cudf::table_view{count_cols}); + + // Build the offsets needed for segmented reduce. + // These are increasing integer values spaced evenly as per the number of cities (keys). + auto const num_keys = agg_data.front()->num_rows(); + auto const size = static_cast(num_keys) + 1; + auto const start = cudf::numeric_scalar(0, true, stream); + auto const step = cudf::numeric_scalar(agg_data.size(), true, stream); + auto seg_offsets = cudf::sequence(size, start, step, stream); + auto offsets_span = cudf::device_span(seg_offsets->view()); + + // compute the min/max for each key by using segmented reduce + auto min_agg = cudf::make_min_aggregation(); + mins = cudf::segmented_reduce( + mins->view(), offsets_span, *min_agg, mins->type(), cudf::null_policy::EXCLUDE, stream); + auto max_agg = cudf::make_max_aggregation(); + maxes = cudf::segmented_reduce( + maxes->view(), offsets_span, *max_agg, maxes->type(), cudf::null_policy::EXCLUDE, stream); + + // compute the sum and total counts in the same way + auto sum_agg = cudf::make_sum_aggregation(); + sums = cudf::segmented_reduce( + sums->view(), offsets_span, *sum_agg, sums->type(), cudf::null_policy::EXCLUDE, stream); + counts = cudf::segmented_reduce( + counts->view(), offsets_span, *sum_agg, counts->type(), cudf::null_policy::EXCLUDE, stream); + + // compute the means using binary-operation to divide the individual rows sum/count + auto means = cudf::binary_operation( + sums->view(), counts->view(), cudf::binary_operator::DIV, sums->type(), stream); + + std::vector> results; + results.emplace_back(std::move(mins)); + results.emplace_back(std::move(maxes)); + results.emplace_back(std::move(means)); + return std::make_unique(std::move(results)); +} diff --git a/cpp/examples/billion_rows/groupby_results.hpp b/cpp/examples/billion_rows/groupby_results.hpp new file mode 100644 index 00000000000..d5a88428329 --- /dev/null +++ b/cpp/examples/billion_rows/groupby_results.hpp @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include + +#include + +/** + * @brief Process the cities and temperatures + * + * Perform the given aggregations using the cities as the keys and the + * temperatures as values. + * + * @param cities The city names + * @param temperatures The temperature values + * @param aggregations Which groupby aggregations to perform + * @param stream CUDA stream to use for launching kernels + * @return aggregated results + */ +std::unique_ptr compute_results( + cudf::column_view const& cities, + cudf::column_view const& temperatures, + std::vector>&& aggregations, + rmm::cuda_stream_view stream = cudf::get_default_stream()); + +/** + * @brief Produce the final aggregations from sub-aggregate results + * + * @param agg_data Sub-aggregations to summarize + * @param stream CUDA stream to use for launching kernels + * @return final results + */ +std::unique_ptr compute_final_aggregates( + std::vector>& agg_data, + rmm::cuda_stream_view stream = cudf::get_default_stream()); diff --git a/cpp/examples/build.sh b/cpp/examples/build.sh index 2d6f6f316c7..8e8d8bd0b78 100755 --- a/cpp/examples/build.sh +++ b/cpp/examples/build.sh @@ -61,4 +61,5 @@ build_example tpch build_example strings build_example nested_types build_example parquet_io +build_example billion_rows build_example interop