Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for parquet reader #191

Merged
merged 3 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/io/arrow_parquet_stream_reader_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ int main(int argc, char** argv) {
<< "Arrow Parquet file parser example (reads data as JSON objects)"
<< std::endl;

std::string dir_name = "../test/data/parquet_files_no_fixed_len_binary/";
// assuming the build directory is inside the YGM root directory
std::string dir_name = "../test/data/parquet_files_json/";
if (argc == 2) {
dir_name = argv[1];
}
Expand Down
72 changes: 33 additions & 39 deletions include/ygm/io/detail/arrow_parquet_json_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,39 @@

namespace ygm::io::detail {
inline boost::json::value read_parquet_element_as_json_value(
const ygm::io::parquet_data_type& type_holder,
const ygm::io::parquet_data_type& type_holder,
arrow_parquet_parser::parquet_stream_reader& stream) {
boost::json::value out_value;
out_value.emplace_null();
switch (type_holder.type) {
case parquet::Type::BOOLEAN:
stream >> out_value.emplace_bool();
break;
case parquet::Type::INT32:
stream >> out_value.emplace_int64();
break;
case parquet::Type::INT64:
stream >> out_value.emplace_int64();
break;
case parquet::Type::DOUBLE:
stream >> out_value.emplace_double();
break;
case parquet::Type::FLOAT:
stream >> out_value.emplace_double();
break;
case parquet::Type::BYTE_ARRAY: {
std::string buf;
stream >> buf;
out_value.emplace_string() = buf;
break;
}
case parquet::Type::FIXED_LEN_BYTE_ARRAY: {
throw std::runtime_error("FIXED_LEN_BYTE_ARRAY is not supported");
break;
}
case parquet::Type::INT96:
throw std::runtime_error("INT96 is not supported");
break;
case parquet::Type::UNDEFINED:
[[fallthrough]];
default:
throw std::runtime_error("Undefined data type");
break;
// Note: there is no uint types in Parquet
if (type_holder.type == parquet::Type::BOOLEAN) {
stream >> out_value.emplace_bool();
} else if (type_holder.type == parquet::Type::INT32) {
int32_t buf;
stream >> buf; // need to read to an int32 variable
// Note: there is no int32 type in boost::json
out_value.emplace_int64() = int64_t(buf);
} else if (type_holder.type == parquet::Type::INT64) {
stream >> out_value.emplace_int64();
} else if (type_holder.type == parquet::Type::FLOAT) {
float buf;
stream >> buf; // need to read to a float variable
// Note: there is no float type in boost::json
out_value.emplace_double() = double(buf);
} else if (type_holder.type == parquet::Type::DOUBLE) {
stream >> out_value.emplace_double();
} else if (type_holder.type == parquet::Type::BYTE_ARRAY) {
std::string buf;
stream >> buf;
out_value.emplace_string() = buf;
} else if (type_holder.type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
throw std::runtime_error("FIXED_LEN_BYTE_ARRAY is not supported");

} else if (type_holder.type == parquet::Type::INT96) {
throw std::runtime_error("INT96 is not supported");

} else {
throw std::runtime_error("Undefined data type");
}

return out_value;
Expand All @@ -75,15 +70,14 @@ inline boost::json::object read_parquet_as_json_helper(
const key_container& include_columns = key_container()) {
boost::json::object object;
for (size_t i = 0; i < schema.size(); ++i) {
const auto& data_type = std::get<0>(schema[i]);
const auto& colum_name = std::get<1>(schema[i]);
const auto& data_type = std::get<0>(schema[i]);
const auto& colum_name = std::get<1>(schema[i]);
if (!read_all &&
std::find(std::begin(include_columns), std::end(include_columns),
colum_name) == std::end(include_columns)) {
continue;
}
object[colum_name] =
read_parquet_element_as_json_value(data_type, reader);
object[colum_name] = read_parquet_element_as_json_value(data_type, reader);
}
reader.SkipColumns(schema.size());
reader.EndRow();
Expand Down
9 changes: 9 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ if (Arrow_FOUND AND Parquet_FOUND)
add_ygm_test(test_arrow_parquet_stream_reader)
target_link_libraries(MPI_test_arrow_parquet_stream_reader PUBLIC
Arrow::arrow_shared Parquet::parquet_shared)

if (Boost_FOUND)
add_ygm_test(test_arrow_parquet_stream_reader_json)
target_include_directories(MPI_test_arrow_parquet_stream_reader_json
PUBLIC
${Boost_INCLUDE_DIRS})
target_link_libraries(MPI_test_arrow_parquet_stream_reader_json PUBLIC
Arrow::arrow_shared Parquet::parquet_shared)
endif()
endif()

#
Expand Down
Binary file added test/data/parquet_files_different_sizes/0.parquet
Binary file not shown.
Binary file added test/data/parquet_files_different_sizes/1.parquet
Binary file not shown.
Binary file added test/data/parquet_files_different_sizes/2.parquet
Binary file not shown.
Binary file added test/data/parquet_files_different_sizes/3.parquet
Binary file not shown.
Binary file added test/data/parquet_files_different_sizes/4.parquet
Binary file not shown.
Binary file added test/data/parquet_files_different_sizes/5.parquet
Binary file not shown.
Binary file added test/data/parquet_files_different_sizes/6.parquet
Binary file not shown.
Binary file added test/data/parquet_files_different_sizes/7.parquet
Binary file not shown.
Binary file added test/data/parquet_files_json/data.parquet
Binary file not shown.
Binary file not shown.
61 changes: 58 additions & 3 deletions test/test_arrow_parquet_stream_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
int main(int argc, char** argv) {
ygm::comm world(&argc, &argv);

// assuming the build directory is inside the YGM root directory
const std::string dir_name = "data/parquet_files/";

//
// Test number of lines in files
{
// assuming the build directory is inside the YGM root directory
const std::string dir_name = "data/parquet_files/";

// arrow_parquet_parser assumes files have identical scehma
ygm::io::arrow_parquet_parser parquetp(world, {dir_name});

Expand All @@ -39,6 +39,9 @@ int main(int argc, char** argv) {
//
// Test table entries
{
// assuming the build directory is inside the YGM root directory
const std::string dir_name = "data/parquet_files/";

// arrow_parquet_parser assumes files have identical scehma
ygm::io::arrow_parquet_parser parquetp(world, {dir_name});

Expand Down Expand Up @@ -77,5 +80,57 @@ int main(int argc, char** argv) {
1);
}

//
// Test the parallel read using files that contain different number of rows
{
// assuming the build directory is inside the YGM root directory
const std::filesystem::path dir_name =
"data/parquet_files_different_sizes/";

// This test case tests the following cases (assuming there are 4
// processes, and Arrow >= v14):
// 1. 0 item files at the top and end.
// 2. read a large file by multiple processes.
// 3. a small file is read by a single process.
// 4. a single process reads multiple files.
// 5. skip files that contain nothing
// 6. total number of rows does not have to
// be splitable evenly by all processes.
//
// Every file contains 1 column, and thre are 11 items in total.
// n-th item's value is 10^n, thus the sum of all value is 11,111,111,111.
ygm::io::arrow_parquet_parser parquetp(
world, {dir_name / "0.parquet", // 0 item
dir_name / "1.parquet", // 7 items
dir_name / "2.parquet", // 0 item
dir_name / "3.parquet", // 0 item
dir_name / "4.parquet", // 2 items
dir_name / "5.parquet", // 1 item
dir_name / "6.parquet", // 1 item
dir_name / "7.parquet"} // 0 item
);

// count total number of rows in the files
size_t local_count = 0;
int64_t local_sum = 0;
parquetp.for_all([&local_sum, &local_count](auto& stream_reader,
const auto& field_count) {
if (field_count > 0) {
int64_t buf;
stream_reader >> buf;
local_sum += buf;
}
stream_reader.SkipColumns(field_count);
stream_reader.EndRow();
local_count++;
});

world.barrier();
const auto sum = world.all_reduce_sum(local_sum);
ASSERT_RELEASE(sum == 11111111111);
const auto row_count = world.all_reduce_sum(local_count);
ASSERT_RELEASE(row_count == 11);
}

return 0;
}
95 changes: 95 additions & 0 deletions test/test_arrow_parquet_stream_reader_json.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2019-2022 Lawrence Livermore National Security, LLC and other YGM
// Project Developers. See the top-level COPYRIGHT file for details.
//
// SPDX-License-Identifier: MIT

#undef NDEBUG

#include <filesystem>
#include <ygm/comm.hpp>
#include <ygm/detail/cereal_boost_json.hpp>
#include <ygm/io/arrow_parquet_parser.hpp>
#include <ygm/io/detail/arrow_parquet_json_converter.hpp>

int main(int argc, char** argv) {
ygm::comm world(&argc, &argv);

const std::string dir_name = "data/parquet_files_json/";

ygm::io::arrow_parquet_parser parquetp(world, {dir_name});

static size_t cnt1 = 0;
static size_t cnt2 = 0;
static size_t cnt3 = 0;

// Test if ygm::io::detail::read_parquet_as_json can read all supported data
// types correctly
const auto& schema = parquetp.schema();
parquetp.for_all([&schema, &world](auto& stream_reader, const auto&) {
const auto obj =
ygm::io::detail::read_parquet_as_json(stream_reader, schema);

world.async(
0,
[](auto, const auto& obj) {
ASSERT_RELEASE(obj.contains("id"));
ASSERT_RELEASE(obj.contains("bool"));
ASSERT_RELEASE(obj.contains("int32"));
ASSERT_RELEASE(obj.contains("int64"));
ASSERT_RELEASE(obj.contains("float"));
ASSERT_RELEASE(obj.contains("double"));
ASSERT_RELEASE(obj.contains("byte_array"));

ASSERT_RELEASE(obj.at("id").is_int64());
ASSERT_RELEASE(obj.at("bool").is_bool());
ASSERT_RELEASE(obj.at("int32").is_int64());
ASSERT_RELEASE(obj.at("int64").is_int64());
ASSERT_RELEASE(obj.at("float").is_double());
ASSERT_RELEASE(obj.at("double").is_double());
ASSERT_RELEASE(obj.at("byte_array").is_string());

const auto id = obj.at("id").as_int64();
if (id == 0) {
ASSERT_RELEASE(obj.at("bool").as_bool() == true);
ASSERT_RELEASE(obj.at("int32").as_int64() == -1);
ASSERT_RELEASE(obj.at("int64").as_int64() == -(1ULL << 32) - 1);
ASSERT_RELEASE(obj.at("float").as_double() == 1.5);
ASSERT_RELEASE(obj.at("double").as_double() == 10.5);
ASSERT_RELEASE(obj.at("byte_array").as_string() == "aa");
++cnt1;
} else if (id == 1) {
ASSERT_RELEASE(obj.at("bool").as_bool() == false);
ASSERT_RELEASE(obj.at("int32").as_int64() == -2);
ASSERT_RELEASE(obj.at("int64").as_int64() == -(1ULL << 32) - 2);
ASSERT_RELEASE(obj.at("float").as_double() == 2.5);
ASSERT_RELEASE(obj.at("double").as_double() == 20.5);
ASSERT_RELEASE(obj.at("byte_array").as_string() == "bb");
++cnt2;
} else if (id == 2) {
ASSERT_RELEASE(obj.at("bool").as_bool() == true);
ASSERT_RELEASE(obj.at("int32").as_int64() == -3);
ASSERT_RELEASE(obj.at("int64").as_int64() == -(1ULL << 32) - 3);
ASSERT_RELEASE(obj.at("float").as_double() == 3.5);
ASSERT_RELEASE(obj.at("double").as_double() == 30.5);
ASSERT_RELEASE(obj.at("byte_array").as_string() == "cc");
++cnt3;
} else {
ASSERT_RELEASE(false);
}
},
obj);
});
world.barrier();

if (world.rank0()) {
ASSERT_RELEASE(cnt1 == 1);
ASSERT_RELEASE(cnt2 == 1);
ASSERT_RELEASE(cnt3 == 1);
} else {
ASSERT_RELEASE(cnt1 == 0);
ASSERT_RELEASE(cnt2 == 0);
ASSERT_RELEASE(cnt3 == 0);
}

return 0;
}