Skip to content

Commit

Permalink
Generate parquet files in device buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
JayjeetAtGithub committed Aug 12, 2024
1 parent fbb14e8 commit c99b225
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 228 deletions.
1 change: 0 additions & 1 deletion cpp/examples/tpch/dbgen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ void write_parquet(std::unique_ptr<cudf::table> table,
std::vector<std::string> const& col_names)
{
CUDF_FUNC_RANGE();
std::cout << __func__ << " : " << path << std::endl;
cudf::io::table_metadata metadata;
std::vector<cudf::io::column_name_info> col_name_infos;
for (auto& col_name : col_names) {
Expand Down
38 changes: 18 additions & 20 deletions cpp/examples/tpch/q1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
#include <cudf/column/column.hpp>
#include <cudf/scalar/scalar.hpp>

#include <cudf_benchmark/tpch_datagen.hpp>

/**
* @file q1.cpp
* @brief Implement query 1 of the TPC-H benchmark.
Expand Down Expand Up @@ -66,6 +64,7 @@
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
{
CUDF_FUNC_RANGE();
auto const one = cudf::numeric_scalar<double>(1);
auto const one_minus_discount =
cudf::binary_operation(one, discount, cudf::binary_operator::SUB, discount.type(), stream, mr);
Expand Down Expand Up @@ -93,6 +92,7 @@
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
{
CUDF_FUNC_RANGE();
auto const one = cudf::numeric_scalar<double>(1);
auto const one_plus_tax =
cudf::binary_operation(one, tax, cudf::binary_operator::ADD, tax.type(), stream, mr);
Expand All @@ -103,13 +103,15 @@
}

/**
* @brief Generate or read the dataset
* @brief Read out the dataset from a map of parquet data sources
*
* @param source The dataset source
* @param sources The map of parquet data sources
*/
[[nodiscard]] std::unique_ptr<table_with_names> prepare_dataset(std::string source)
[[nodiscard]] std::unique_ptr<table_with_names> read_parquet_sources(
std::unordered_map<std::string, parquet_source>& sources)
{
// Define the column projections and filter predicate for `lineitem` table
CUDF_FUNC_RANGE();
// Define the column projection and filter predicates for the source tables
std::vector<std::string> const lineitem_cols = {"l_returnflag",
"l_linestatus",
"l_quantity",
Expand All @@ -126,16 +128,8 @@
auto const lineitem_pred = std::make_unique<cudf::ast::operation>(
cudf::ast::ast_operator::LESS_EQUAL, shipdate_ref, shipdate_upper_literal);

if (source == "cudf_datagen") {
auto [o, l, p] = cudf::datagen::generate_orders_lineitem_part(
get_sf(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto lineitem =
std::make_unique<table_with_names>(std::move(l), cudf::datagen::schema::LINEITEM);
auto lineitem_projected = apply_projection(std::move(lineitem), lineitem_cols);
return apply_filter(std::move(lineitem_projected), *lineitem_pred);
} else {
return read_parquet(source + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred));
}
return read_parquet(
sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred));
}

int main(int argc, char const** argv)
Expand All @@ -146,17 +140,21 @@ int main(int argc, char const** argv)
auto resource = create_memory_resource(args.memory_resource_type);
rmm::mr::set_current_device_resource(resource.get());

// Print hardware stats
print_hardware_stats();
// Print device information
print_device_info();

// Instantiate the memory stats logger
auto const mem_stats_logger = memory_stats_logger();

// Generate a set of data sources
std::unordered_map<std::string, parquet_source> sources;
populate_parquet_sources(args.dataset_dir, {"lineitem"}, sources);

// Start timer
cudf::examples::timer timer;

// Prepare the dataset
auto lineitem = prepare_dataset(args.dataset_dir);
// Read the dataset using the data sources
auto lineitem = read_parquet_sources(sources);

// Calculate the discount price and charge columns and append to lineitem table
auto disc_price =
Expand Down
68 changes: 22 additions & 46 deletions cpp/examples/tpch/q10.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
#include <cudf/column/column.hpp>
#include <cudf/scalar/scalar.hpp>

#include <cudf_benchmark/tpch_datagen.hpp>

/**
* @file q10.cpp
* @brief Implement query 10 of the TPC-H benchmark.
Expand Down Expand Up @@ -79,6 +77,7 @@
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
{
CUDF_FUNC_RANGE();
auto const one = cudf::numeric_scalar<double>(1);
auto const one_minus_discount =
cudf::binary_operation(one, discount, cudf::binary_operator::SUB, discount.type(), stream, mr);
Expand All @@ -93,12 +92,13 @@
}

/**
* @brief Generate or read the dataset
* @brief Read out the dataset from a map of parquet data sources
*
* @param source The dataset source
* @param sources The map of parquet data sources
*/
[[nodiscard]] auto prepare_dataset(std::string source)
[[nodiscard]] auto read_parquet_sources(std::unordered_map<std::string, parquet_source>& sources)
{
CUDF_FUNC_RANGE();
// Define the column projection and filter predicates for the source tables
std::vector<std::string> const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"};
std::vector<std::string> const lineitem_cols = {
Expand Down Expand Up @@ -128,43 +128,15 @@
auto const lineitem_pred = std::make_unique<cudf::ast::operation>(
cudf::ast::ast_operator::EQUAL, l_returnflag_ref, r_literal);

if (source == "cudf_datagen") {
auto [o, l, p] = cudf::datagen::generate_orders_lineitem_part(
get_sf(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto orders = std::make_unique<table_with_names>(std::move(o), cudf::datagen::schema::ORDERS);
auto orders_projected = apply_projection(std::move(orders), orders_cols);
auto orders_filtered = apply_filter(std::move(orders_projected), *orders_pred);

auto lineitem =
std::make_unique<table_with_names>(std::move(l), cudf::datagen::schema::LINEITEM);
auto lineitem_projected = apply_projection(std::move(lineitem), lineitem_cols);
auto lineitem_filtered = apply_filter(std::move(lineitem_projected), *lineitem_pred);

auto c = cudf::datagen::generate_customer(
get_sf(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto customer =
std::make_unique<table_with_names>(std::move(c), cudf::datagen::schema::CUSTOMER);
auto customer_projected = apply_projection(std::move(customer), customer_cols);

auto n = cudf::datagen::generate_nation(cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto nation = std::make_unique<table_with_names>(std::move(n), cudf::datagen::schema::NATION);
auto nation_projected = apply_projection(std::move(nation), nation_cols);

return std::make_tuple(std::move(orders_filtered),
std::move(lineitem_filtered),
std::move(customer_projected),
std::move(nation_projected));
} else {
auto orders = read_parquet(source + "/orders.parquet", orders_cols, std::move(orders_pred));
auto lineitem =
read_parquet(source + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred));
auto customer = read_parquet(source + "/customer.parquet", customer_cols);
auto nation = read_parquet(source + "/nation.parquet", nation_cols);

return std::make_tuple(
std::move(orders), std::move(lineitem), std::move(customer), std::move(nation));
}
auto orders =
read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred));
auto lineitem =
read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred));
auto customer = read_parquet(sources["customer"].make_source_info(), customer_cols);
auto nation = read_parquet(sources["nation"].make_source_info(), nation_cols);

return std::make_tuple(
std::move(orders), std::move(lineitem), std::move(customer), std::move(nation));
}

int main(int argc, char const** argv)
Expand All @@ -175,17 +147,21 @@ int main(int argc, char const** argv)
auto resource = create_memory_resource(args.memory_resource_type);
rmm::mr::set_current_device_resource(resource.get());

// Print hardware stats
print_hardware_stats();
// Print device information
print_device_info();

// Instantiate the memory stats logger
auto const mem_stats_logger = memory_stats_logger();

// Generate a set of data sources
std::unordered_map<std::string, parquet_source> sources;
populate_parquet_sources(args.dataset_dir, {"orders", "lineitem", "customer", "nation"}, sources);

// Start timer
cudf::examples::timer timer;

// Prepare the dataset
auto [orders, lineitem, customer, nation] = prepare_dataset(args.dataset_dir);
// Read the dataset using the data sources
auto [orders, lineitem, customer, nation] = read_parquet_sources(sources);

// Perform the joins
auto const join_a = apply_inner_join(customer, nation, {"c_nationkey"}, {"n_nationkey"});
Expand Down
106 changes: 31 additions & 75 deletions cpp/examples/tpch/q5.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
#include <cudf/column/column.hpp>
#include <cudf/scalar/scalar.hpp>

#include <cudf_benchmark/tpch_datagen.hpp>

/**
* @file q5.cpp
* @brief Implement query 5 of the TPC-H benchmark.
Expand Down Expand Up @@ -74,6 +72,7 @@
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
{
CUDF_FUNC_RANGE();
auto const one = cudf::numeric_scalar<double>(1);
auto const one_minus_discount =
cudf::binary_operation(one, discount, cudf::binary_operator::SUB, discount.type(), stream, mr);
Expand All @@ -88,13 +87,14 @@
}

/**
* @brief Generate or read the dataset
* @brief Read out the dataset from a map of parquet data sources
*
* @param source The dataset source
* @param sources The map of parquet data sources
*/
[[nodiscard]] auto prepare_dataset(std::string source)
[[nodiscard]] auto read_parquet_sources(std::unordered_map<std::string, parquet_source>& sources)
{
// Define the column projection and filter predicate for the `orders` table
CUDF_FUNC_RANGE();
// Define the column projection and filter predicates for the source tables
std::vector<std::string> const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"};

auto const o_orderdate_ref = cudf::ast::column_reference(std::distance(
Expand All @@ -112,21 +112,12 @@
auto const orders_pred = std::make_unique<cudf::ast::operation>(
cudf::ast::ast_operator::LOGICAL_AND, o_orderdate_pred_lower, o_orderdate_pred_upper);

// Define the column projection for the `lineitem` table
std::vector<std::string> const lineitem_cols = {
"l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"};

// Define the column projection for the `customer` table
std::vector<std::string> const customer_cols = {"c_custkey", "c_nationkey"};

// Define the column projection for the `supplier` table
std::vector<std::string> const supplier_cols = {"s_suppkey", "s_nationkey"};

// Define the column projection for the `nation` table
std::vector<std::string> const nation_cols = {"n_nationkey", "n_regionkey", "n_name"};

// Define the column projection and filter predicate for the `region` table
std::vector<std::string> const region_cols = {"r_regionkey", "r_name"};
std::vector<std::string> const nation_cols = {"n_nationkey", "n_regionkey", "n_name"};
std::vector<std::string> const region_cols = {"r_regionkey", "r_name"};

auto const r_name_ref = cudf::ast::column_reference(std::distance(
region_cols.begin(), std::find(region_cols.begin(), region_cols.end(), "r_name")));
Expand All @@ -135,60 +126,20 @@
auto const region_pred = std::make_unique<cudf::ast::operation>(
cudf::ast::ast_operator::EQUAL, r_name_ref, r_name_literal);

if (source == "cudf_datagen") {
auto [o, l, _] = cudf::datagen::generate_orders_lineitem_part(
get_sf(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto orders = std::make_unique<table_with_names>(std::move(o), cudf::datagen::schema::ORDERS);
auto orders_projected = apply_projection(std::move(orders), orders_cols);
auto orders_filtered = apply_filter(std::move(orders_projected), *orders_pred);

auto lineitem =
std::make_unique<table_with_names>(std::move(l), cudf::datagen::schema::LINEITEM);
auto lineitem_projected = apply_projection(std::move(lineitem), lineitem_cols);

auto c = cudf::datagen::generate_customer(
get_sf(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto customer =
std::make_unique<table_with_names>(std::move(c), cudf::datagen::schema::CUSTOMER);
auto customer_projected = apply_projection(std::move(customer), customer_cols);

auto s = cudf::datagen::generate_supplier(
get_sf(), cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto supplier =
std::make_unique<table_with_names>(std::move(s), cudf::datagen::schema::SUPPLIER);
auto supplier_projected = apply_projection(std::move(supplier), supplier_cols);

auto n = cudf::datagen::generate_nation(cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto nation = std::make_unique<table_with_names>(std::move(n), cudf::datagen::schema::NATION);
auto nation_projected = apply_projection(std::move(nation), nation_cols);

auto r = cudf::datagen::generate_region(cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto region = std::make_unique<table_with_names>(std::move(r), cudf::datagen::schema::REGION);
auto region_projected = apply_projection(std::move(region), region_cols);
auto region_filtered = apply_filter(std::move(region_projected), *region_pred);

return std::make_tuple(std::move(orders_filtered),
std::move(lineitem_projected),
std::move(customer_projected),
std::move(supplier_projected),
std::move(nation_projected),
std::move(region_filtered));
} else {
auto orders = read_parquet(source + "/orders.parquet", orders_cols, std::move(orders_pred));
auto customer = read_parquet(source + "/customer.parquet", customer_cols);
auto lineitem = read_parquet(source + "/lineitem.parquet", lineitem_cols);
auto supplier = read_parquet(source + "/supplier.parquet", supplier_cols);
auto nation = read_parquet(source + "/nation.parquet", nation_cols);
auto region = read_parquet(source + "/region.parquet", region_cols, std::move(region_pred));
return std::make_tuple(std::move(orders),
std::move(lineitem),
std::move(customer),
std::move(supplier),
std::move(nation),
std::move(region));
}
auto orders =
read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred));
auto customer = read_parquet(sources["customer"].make_source_info(), customer_cols);
auto lineitem = read_parquet(sources["lineitem"].make_source_info(), lineitem_cols);
auto supplier = read_parquet(sources["supplier"].make_source_info(), supplier_cols);
auto nation = read_parquet(sources["nation"].make_source_info(), nation_cols);
auto region =
read_parquet(sources["region"].make_source_info(), region_cols, std::move(region_pred));
return std::make_tuple(std::move(orders),
std::move(lineitem),
std::move(customer),
std::move(supplier),
std::move(nation),
std::move(region));
}

int main(int argc, char const** argv)
Expand All @@ -199,17 +150,22 @@ int main(int argc, char const** argv)
auto resource = create_memory_resource(args.memory_resource_type);
rmm::mr::set_current_device_resource(resource.get());

// Print hardware stats
print_hardware_stats();
// Print device information
print_device_info();

// Instantiate the memory stats logger
auto const mem_stats_logger = memory_stats_logger();

// Generate a set of data sources
std::unordered_map<std::string, parquet_source> sources;
populate_parquet_sources(
args.dataset_dir, {"customer", "orders", "lineitem", "supplier", "nation", "region"}, sources);

// Start timer
cudf::examples::timer timer;

// Prepare the dataset
auto [orders, lineitem, customer, supplier, nation, region] = prepare_dataset(args.dataset_dir);
// Read the dataset using the data sources
auto [orders, lineitem, customer, supplier, nation, region] = read_parquet_sources(sources);

// Perform the joins
auto const join_a = apply_inner_join(region, nation, {"r_regionkey"}, {"n_regionkey"});
Expand Down
Loading

0 comments on commit c99b225

Please sign in to comment.