Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-s): Add support for ingestion from S3 in the kv-ir ingestion flow; Improve error messages in the kv-ir ingestion flow. #706

Merged
merged 6 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,22 +601,9 @@ bool JsonParser::parse() {
);
}

if (auto network_reader = std::dynamic_pointer_cast<clp::NetworkReader>(reader);
nullptr != network_reader)
{
if (auto const rc = network_reader->get_curl_ret_code();
rc.has_value() && CURLcode::CURLE_OK != rc.value())
{
auto const curl_error_message = network_reader->get_curl_error_msg();
SPDLOG_ERROR(
"Encountered curl error while ingesting {} - Code: {} - Message: {}",
path.path,
static_cast<int64_t>(rc.value()),
curl_error_message.value_or("Unknown error")
);
m_archive_writer->close();
return false;
}
if (check_and_log_curl_error(path, reader)) {
m_archive_writer->close();
return false;
}
}
return true;
Expand Down Expand Up @@ -818,21 +805,30 @@ void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) {
}

auto JsonParser::parse_from_ir() -> bool {
constexpr size_t cDecompressorReadBufferCapacity{64 * 1024}; // 64 KB
for (auto const& path : m_input_paths) {
// TODO: add support for ingesting IR from a network source
if (InputSource::Filesystem != path.source) {
auto reader{ReaderUtils::try_create_reader(path, m_network_auth)};
if (nullptr == reader) {
m_archive_writer->close();
return false;
}

clp::streaming_compression::zstd::Decompressor decompressor;
size_t curr_pos{};
size_t last_pos{};
decompressor.open(path.path);
decompressor.open(*reader, cDecompressorReadBufferCapacity);

auto deserializer_result{Deserializer<IrUnitHandler>::create(decompressor, IrUnitHandler{})
};
if (deserializer_result.has_error()) {
auto err = deserializer_result.error();
SPDLOG_ERROR(
"Encountered error while creating kv-ir deserializer: ({}) - {}",
err.value(),
err.message()
);
decompressor.close();
check_and_log_curl_error(path, reader);
m_archive_writer->close();
return false;
}
Expand All @@ -851,8 +847,15 @@ auto JsonParser::parse_from_ir() -> bool {
auto const kv_log_event_result{deserializer.deserialize_next_ir_unit(decompressor)};

if (kv_log_event_result.has_error()) {
auto err = kv_log_event_result.error();
SPDLOG_ERROR(
"Encountered error while deserializing kv-ir log event: ({}) - {}",
err.value(),
err.message()
);
m_archive_writer->close();
decompressor.close();
check_and_log_curl_error(path, reader);
return false;
}
if (kv_log_event_result.value() == clp::ffi::ir_stream::IrUnitType::EndOfStream) {
Expand Down Expand Up @@ -883,7 +886,7 @@ auto JsonParser::parse_from_ir() -> bool {

if (m_archive_writer->get_data_size() >= m_target_encoded_size) {
m_ir_node_to_archive_node_id_mapping.clear();
decompressor.try_get_pos(curr_pos);
curr_pos = decompressor.get_pos();
m_archive_writer->increment_uncompressed_size(curr_pos - last_pos);
last_pos = curr_pos;
split_archive();
Expand All @@ -898,13 +901,17 @@ auto JsonParser::parse_from_ir() -> bool {
{
continue;
} else {
SPDLOG_ERROR(
"Encountered unkown IR unit type ({}) during deserialization.",
static_cast<uint8_t>(kv_log_event_result.value())
);
m_archive_writer->close();
decompressor.close();
return false;
}
}
m_ir_node_to_archive_node_id_mapping.clear();
decompressor.try_get_pos(curr_pos);
curr_pos = decompressor.get_pos();
m_archive_writer->increment_uncompressed_size(curr_pos - last_pos);
decompressor.close();
}
Expand All @@ -921,4 +928,26 @@ void JsonParser::split_archive() {
m_archive_writer->open(m_archive_options);
}

bool JsonParser::check_and_log_curl_error(
Path const& path,
std::shared_ptr<clp::ReaderInterface> reader
) {
if (auto network_reader = std::dynamic_pointer_cast<clp::NetworkReader>(reader);
nullptr != network_reader)
{
if (auto const rc = network_reader->get_curl_ret_code();
rc.has_value() && CURLcode::CURLE_OK != rc.value())
{
auto const curl_error_message = network_reader->get_curl_error_msg();
SPDLOG_ERROR(
"Encountered curl error while ingesting {} - Code: {} - Message: {}",
path.path,
static_cast<int64_t>(rc.value()),
curl_error_message.value_or("Unknown error")
);
return true;
}
}
return false;
}
} // namespace clp_s
11 changes: 11 additions & 0 deletions components/core/src/clp_s/JsonParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "../clp/ffi/SchemaTree.hpp"
#include "../clp/ffi/Value.hpp"
#include "../clp/GlobalMySQLMetadataDB.hpp"
#include "../clp/ReaderInterface.hpp"
#include "ArchiveWriter.hpp"
#include "CommandLineArguments.hpp"
#include "DictionaryWriter.hpp"
Expand Down Expand Up @@ -169,6 +170,16 @@ class JsonParser {
*/
int32_t add_metadata_field(std::string_view const field_name, NodeType type);

/**
* Checks if a reader interface is a clp::NetworkReader that has encountered a CURL error and
* logs relevant CURL error information if a CURL error has occurred.
* @param path
* @param reader
* @return true if the provided ReaderInterface has experienced a CURL error and false otherwise
*/
static bool
check_and_log_curl_error(Path const& path, std::shared_ptr<clp::ReaderInterface> reader);

int m_num_messages;
std::vector<Path> m_input_paths;
NetworkAuthOption m_network_auth{};
Expand Down
9 changes: 4 additions & 5 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "search/Projection.hpp"
#include "search/SchemaMatch.hpp"
#include "TimestampPattern.hpp"
#include "TraceableException.hpp"
#include "Utils.hpp"

using namespace clp_s::search;
Expand Down Expand Up @@ -209,7 +208,7 @@ bool search_archive(
}
projection->add_column(ColumnDescriptor::create_from_escaped_tokens(descriptor_tokens));
}
} catch (clp_s::TraceableException& e) {
} catch (std::exception const& e) {
SPDLOG_ERROR("{}", e.what());
return false;
}
Expand Down Expand Up @@ -253,7 +252,7 @@ bool search_archive(
SPDLOG_ERROR("Unhandled OutputHandlerType.");
return false;
}
} catch (clp_s::TraceableException& e) {
} catch (std::exception const& e) {
SPDLOG_ERROR("Failed to create output handler - {}", e.what());
return false;
}
Expand Down Expand Up @@ -324,8 +323,8 @@ int main(int argc, char const* argv[]) {
option.archive_path = archive_path;
decompress_archive(option);
}
} catch (clp_s::TraceableException& e) {
SPDLOG_ERROR("{}", e.what());
} catch (std::exception const& e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to change clp_s::TraceableException to std::exception for the search path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the two places that still use it in the search flow? Sure.

SPDLOG_ERROR("Encountered error during decompression - {}", e.what());
return 1;
}
} else {
Expand Down
Loading