Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cxxopts in Parquet-tools #215

Open
wants to merge 12 commits into
base: v0.8-dev
Choose a base branch
from
18 changes: 18 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,24 @@ include(FindArrowParquet)
option(YGM_REQUIRE_ARROW_PARQUET "YGM requires Apache Arrow Parquet." OFF)
find_arrow_parquet()

#
# cxxopts
#
find_package(cxxopts 3.2 CONFIG QUIET)
if (cxxopts_FOUND)
message(STATUS ${PROJECT_NAME} " found cxxopts include dirs: "
${cxxopts_INCLUDE_DIRS}
)
else ()
FetchContent_Declare(
cxxopts
GIT_REPOSITORY https://github.com/jarro2783/cxxopts.git
GIT_TAG v3.2.0)
FetchContent_MakeAvailable(cxxopts)
message(STATUS ${PROJECT_NAME} " cloned cxxopts dependency: "
${cxxopts_SOURCE_DIR})
endif ()

#
# Create the YGM target library
#
Expand Down
6 changes: 4 additions & 2 deletions tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
# SPDX-License-Identifier: MIT

if (Arrow_FOUND AND Parquet_FOUND)
if (Boost_FOUND)
if (Boost_FOUND AND TARGET cxxopts::cxxopts)
add_ygm_example(parquet_tools)
target_include_directories(parquet_tools PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories(parquet_tools PRIVATE ${Boost_INCLUDE_DIRS})
link_arrow_parquet(parquet_tools)
target_include_directories(parquet_tools PRIVATE cxxopts::cxxopts)
target_link_libraries(parquet_tools PRIVATE cxxopts::cxxopts)
endif ()
endif ()
file(COPY ./parquet_tools_subcmd.json DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
160 changes: 104 additions & 56 deletions tools/parquet_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include <arrow/io/file.h>
#include <parquet/stream_writer.h>
#include <boost/json.hpp>
#include <cxxopts.hpp>

#include <ygm/comm.hpp>
#include <ygm/io/csv_parser.hpp>
Expand All @@ -37,84 +39,90 @@ static constexpr char const* const SCHEMA = "schema";
static constexpr char const* const DUMP = "dump";
static constexpr char const* const CONVERT = "convert";

bool parse_arguments(int argc, char** argv, options_t&, bool&);
template <typename os_t>
void show_usage(char** argv, os_t&);
void count_rows(const options_t&, ygm::comm&);
void dump(const options_t&, ygm::comm&);
void convert(const options_t&, ygm::comm&);
bool parse_arguments(int, char**, std::ostream&, options_t&, bool&);
void show_usage(char** argv, std::ostream&);
bool count_rows(const options_t&, ygm::comm&);
bool show_schema(const options_t&, ygm::comm&);
bool dump(const options_t&, ygm::comm&);
bool convert(const options_t&, ygm::comm&);

int main(int argc, char** argv) {
bool ret = false;
ygm::comm world(&argc, &argv);
{
options_t opt;
bool show_help = false;
if (!parse_arguments(argc, argv, opt, show_help)) {
world.cerr0() << "Invalid arguments." << std::endl;
if (world.rank0()) show_usage(argv, std::cerr);
if (!parse_arguments(argc, argv, world.cerr0(), opt, show_help)) {
return EXIT_FAILURE;
}
if (show_help) {
if (world.rank0()) show_usage(argv, std::cout);
show_usage(argv, world.cerr0());
return 0;
}

if (opt.subcommand == ROWCOUNT) {
count_rows(opt, world);
ret = count_rows(opt, world);
} else if (opt.subcommand == SCHEMA) {
world.cout0() << "Schema" << std::endl;
ygm::io::parquet_parser parquetp(world, {opt.input_path.c_str()});
world.cout0() << parquetp.schema_to_string() << std::endl;
ret = show_schema(opt, world);
} else if (opt.subcommand == DUMP) {
dump(opt, world);
ret = dump(opt, world);
} else if (opt.subcommand == CONVERT) {
convert(opt, world);
ret = convert(opt, world);
} else {
world.cerr0() << "Unknown subcommand: " << opt.subcommand << std::endl;
return EXIT_FAILURE;
}
}
world.barrier();

return 0;
if (!ret) {
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}

bool parse_arguments(int argc, char** argv, options_t& options,
bool& show_help) {
int opt;
while ((opt = getopt(argc, argv, "c:i:vjro:h")) != -1) {
switch (opt) {
case 'c':
options.subcommand = optarg;
break;
case 'i':
options.input_path = optarg;
break;
case 'r':
options.read_lines = true;
break;
case 'v':
options.read_lines = true;
options.variant = true;
break;
case 'j':
options.read_lines = true;
options.json = true;
break;
case 'o':
options.output_file_prefix = optarg;
break;
case 'h':
show_help = true;
break;
default:
return false;
/// Parse command line arguments using cxxopts
bool parse_arguments(int argc, char** argv, std::ostream& os,
options_t& options, bool& show_help) {
try {
cxxopts::Options options_desc("parquet-tools",
"A tool for parquet file manipulation.");
options_desc.add_options()("c,command", "Subcommand name.",
cxxopts::value(options.subcommand))(
"i,input", "Input file path.", cxxopts::value(options.input_path))(
"r,read-lines", "Read rows w/o converting.")(
"v,variant", "Read as variants.")("j,json", "Read as JSON objects.")(
"o,output", "Output file prefix.",
cxxopts::value(options.output_file_prefix))("h,help",
"Show this help message.");

auto result = options_desc.parse(argc, argv);
if (result.count("help")) {
show_help = true;
return true;
}

if (result.count("read-lines")) {
options.read_lines = true;
}

if (result.count("variant")) {
options.variant = true;
}

if (result.count("json")) {
options.json = true;
}
} catch (const cxxopts::exceptions::exception& e) {
os << "Error parsing options: " << e.what() << std::endl;
return false;
}

return true;
}

// Only for rank 0
template <typename os_t>
void show_usage(char** argv, os_t& os) {
void show_usage(char** argv, std::ostream& os) {
os << "[Usage]" << std::endl;
os << "mpirun -np <#of ranks> ./parquet-tools [options]" << std::endl;
os << std::endl;
Expand Down Expand Up @@ -160,7 +168,7 @@ void show_usage(char** argv, os_t& os) {
os << " Required arguments" << std::endl;
for (const auto& req : entry_obj.at("req").as_array()) {
auto& req_obj = req.as_object();
os << " -" << format(req_obj.at("key").as_string()) << " ";
os << " " << format(req_obj.at("key").as_string()) << " ";
if (req_obj.contains("value")) {
os << " <" << format(req_obj.at("value").as_string()) << "> ";
}
Expand All @@ -173,7 +181,7 @@ void show_usage(char** argv, os_t& os) {
for (const auto& op : entry_obj.at("opt").as_array()) {
auto& op_obj = op.as_object();
assert(op_obj.contains("key"));
os << " -" << format(op_obj.at("key").as_string()) << " ";
os << " " << format(op_obj.at("key").as_string()) << " ";
if (op_obj.contains("value")) {
assert(op_obj.contains("value"));
os << " <" << format(op_obj.at("value").as_string()) << "> ";
Expand All @@ -186,7 +194,12 @@ void show_usage(char** argv, os_t& os) {
}
}

void count_rows(const options_t& opt, ygm::comm& world) {
bool count_rows(const options_t& opt, ygm::comm& world) {
if (opt.input_path.empty()) {
world.cerr0() << "Input file path is empty." << std::endl;
return false;
}

if (opt.variant) {
world.cout0() << "Read as variants." << std::endl;
} else if (opt.json) {
Expand Down Expand Up @@ -235,9 +248,32 @@ void count_rows(const options_t& opt, ygm::comm& world) {
world.cout0() << "#of conversion error lines = "
<< world.all_reduce_sum(num_error_lines) << std::endl;
}
return true;
}

void dump(const options_t& opt, ygm::comm& world) {
bool show_schema(const options_t& opt, ygm::comm& world) {
if (opt.input_path.empty()) {
world.cerr0() << "Input file path is empty." << std::endl;
return false;
}

world.cout0() << "Schema" << std::endl;
ygm::io::parquet_parser parquetp(world, {opt.input_path.c_str()});
world.cout0() << parquetp.schema_to_string() << std::endl;
return true;
}

bool dump(const options_t& opt, ygm::comm& world) {
if (opt.input_path.empty()) {
world.cerr0() << "Input file path is empty." << std::endl;
return false;
}

if (opt.output_file_prefix.empty()) {
world.cerr0() << "Output file prefix is empty." << std::endl;
return false;
}

if (opt.json) {
world.cout0() << "Dump as JSON objects." << std::endl;
} else {
Expand All @@ -256,7 +292,7 @@ void dump(const options_t& opt, ygm::comm& world) {
if (!ofs) {
world.cerr0() << "Failed to open the output file: " << output_path
<< std::endl;
::MPI_Abort(world.get_mpi_comm(), EXIT_FAILURE);
return false;
}

ygm::timer timer{};
Expand Down Expand Up @@ -293,7 +329,7 @@ void dump(const options_t& opt, ygm::comm& world) {
if (!ofs) {
world.cerr0() << "Failed to write the output file: " << output_path
<< std::endl;
::MPI_Abort(world.get_mpi_comm(), EXIT_FAILURE);
return false;
}
const auto elapsed_time = timer.elapsed();
num_rows = world.all_reduce_sum(num_rows);
Expand All @@ -304,9 +340,20 @@ void dump(const options_t& opt, ygm::comm& world) {
world.cout0() << "#of conversion error lines = "
<< world.all_reduce_sum(num_error_lines) << std::endl;
}
return true;
}

void convert(const options_t& opt, ygm::comm& world) {
bool convert(const options_t& opt, ygm::comm& world) {
if (opt.input_path.empty()) {
world.cerr0() << "Input file path is empty." << std::endl;
return false;
}

if (opt.output_file_prefix.empty()) {
world.cerr0() << "Output file prefix is empty." << std::endl;
return false;
}

std::string output_path =
std::string(opt.output_file_prefix) + "-" + std::to_string(world.rank());
std::cout << "Output path: " << output_path << std::endl;
Expand Down Expand Up @@ -378,4 +425,5 @@ void convert(const options_t& opt, ygm::comm& world) {
if (schema_defined) {
parquet_writer << parquet::EndRowGroup;
}
return true;
}
22 changes: 11 additions & 11 deletions tools/parquet_tools_subcmd.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
"desc": "Return the number of rows in parquet files. If no subcommand option was specified, return the value stored in the metadata without reading the whole data or counting the number of lines.",
"req": [
{
"key": "i",
"key": "-i, --input",
"value": "path",
"desc": "Parquet file path or a directory path that contains parquet files. All parquet files must have the same schema."
}
],
"opt": [
{
"key": "r",
"key": "-r, --read-lines",
"desc": "Read rows w/o converting."
},
{
"key": "v",
"key": "-v, --variant",
"desc": "Read rows converting to arrays of std::variant."
},
{
"key": "j",
"key": "-j, --json",
"desc": "Read rows converting to arrays of JSON objects."
}
]
Expand All @@ -29,7 +29,7 @@
"desc": "Show the schemas of parquet files.",
"req": [
{
"key": "i",
"key": "-i, --input",
"value": "path",
"desc": "Parquet file path or a directory path that contains parquet files. All parquet files must have the same schema."
}
Expand All @@ -40,23 +40,23 @@
"desc": "Dump data to files. One output file per rank.",
"req": [
{
"key": "i",
"key": "-i, --input",
"value": "path",
"desc": "Parquet file path or a directory path that contains parquet files. All parquet files must have the same schema."
},
{
"key": "o",
"key": "-o, --output",
"value": "path",
"desc": "Prefix of output files."
}
],
"opt": [
{
"key": "v",
"key": "-v, --variant",
"desc": "Dump rows converting to arrays of std::variant (default)"
},
{
"key": "j",
"key": "-j, --json",
"desc": "Dump rows converting to arrays of JSON objects."
}
]
Expand All @@ -66,12 +66,12 @@
"desc": "Convert files to parquet files. Currently, only CSV is supported.",
"req": [
{
"key": "i",
"key": "-i, --input",
"value": "path",
"desc": "Path to an input non-parquet file or to a directory that contains non-parquet files. All CSV files must have the same column types."
},
{
"key": "o",
"key": "-o, --output",
"value": "path",
"desc": "Prefix of output parquet files."
}
Expand Down