diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp index f8745dc24..654236b5e 100644 --- a/components/core/src/clp_s/JsonParser.cpp +++ b/components/core/src/clp_s/JsonParser.cpp @@ -601,22 +601,9 @@ bool JsonParser::parse() { ); } - if (auto network_reader = std::dynamic_pointer_cast(reader); - nullptr != network_reader) - { - if (auto const rc = network_reader->get_curl_ret_code(); - rc.has_value() && CURLcode::CURLE_OK != rc.value()) - { - auto const curl_error_message = network_reader->get_curl_error_msg(); - SPDLOG_ERROR( - "Encountered curl error while ingesting {} - Code: {} - Message: {}", - path.path, - static_cast(rc.value()), - curl_error_message.value_or("Unknown error") - ); - m_archive_writer->close(); - return false; - } + if (check_and_log_curl_error(path, reader)) { + m_archive_writer->close(); + return false; } } return true; @@ -818,21 +805,30 @@ void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) { } auto JsonParser::parse_from_ir() -> bool { + constexpr size_t cDecompressorReadBufferCapacity{64 * 1024}; // 64 KB for (auto const& path : m_input_paths) { - // TODO: add support for ingesting IR from a network source - if (InputSource::Filesystem != path.source) { + auto reader{ReaderUtils::try_create_reader(path, m_network_auth)}; + if (nullptr == reader) { m_archive_writer->close(); return false; } + clp::streaming_compression::zstd::Decompressor decompressor; size_t curr_pos{}; size_t last_pos{}; - decompressor.open(path.path); + decompressor.open(*reader, cDecompressorReadBufferCapacity); auto deserializer_result{Deserializer::create(decompressor, IrUnitHandler{}) }; if (deserializer_result.has_error()) { + auto err = deserializer_result.error(); + SPDLOG_ERROR( + "Encountered error while creating kv-ir deserializer: ({}) - {}", + err.value(), + err.message() + ); decompressor.close(); + check_and_log_curl_error(path, reader); m_archive_writer->close(); return false; } @@ -851,8 +847,15 @@ auto JsonParser::parse_from_ir() -> bool { auto const kv_log_event_result{deserializer.deserialize_next_ir_unit(decompressor)}; if (kv_log_event_result.has_error()) { + auto err = kv_log_event_result.error(); + SPDLOG_ERROR( + "Encountered error while deserializing kv-ir log event: ({}) - {}", + err.value(), + err.message() + ); m_archive_writer->close(); decompressor.close(); + check_and_log_curl_error(path, reader); return false; } if (kv_log_event_result.value() == clp::ffi::ir_stream::IrUnitType::EndOfStream) { @@ -883,7 +886,7 @@ auto JsonParser::parse_from_ir() -> bool { if (m_archive_writer->get_data_size() >= m_target_encoded_size) { m_ir_node_to_archive_node_id_mapping.clear(); - decompressor.try_get_pos(curr_pos); + curr_pos = decompressor.get_pos(); m_archive_writer->increment_uncompressed_size(curr_pos - last_pos); last_pos = curr_pos; split_archive(); @@ -898,13 +901,17 @@ auto JsonParser::parse_from_ir() -> bool { { continue; } else { + SPDLOG_ERROR( + "Encountered unkown IR unit type ({}) during deserialization.", + static_cast(kv_log_event_result.value()) + ); m_archive_writer->close(); decompressor.close(); return false; } } m_ir_node_to_archive_node_id_mapping.clear(); - decompressor.try_get_pos(curr_pos); + curr_pos = decompressor.get_pos(); m_archive_writer->increment_uncompressed_size(curr_pos - last_pos); decompressor.close(); } @@ -921,4 +928,26 @@ void JsonParser::split_archive() { m_archive_writer->open(m_archive_options); } +bool JsonParser::check_and_log_curl_error( + Path const& path, + std::shared_ptr reader +) { + if (auto network_reader = std::dynamic_pointer_cast(reader); + nullptr != network_reader) + { + if (auto const rc = network_reader->get_curl_ret_code(); + rc.has_value() && CURLcode::CURLE_OK != rc.value()) + { + auto const curl_error_message = network_reader->get_curl_error_msg(); + SPDLOG_ERROR( + "Encountered curl error while ingesting {} - Code: {} - Message: {}", + path.path, + static_cast(rc.value()), + curl_error_message.value_or("Unknown error") + ); + return true; + } + } + return false; +} } // namespace clp_s diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp index 12199df6c..bf29de1be 100644 --- a/components/core/src/clp_s/JsonParser.hpp +++ b/components/core/src/clp_s/JsonParser.hpp @@ -18,6 +18,7 @@ #include "../clp/ffi/SchemaTree.hpp" #include "../clp/ffi/Value.hpp" #include "../clp/GlobalMySQLMetadataDB.hpp" +#include "../clp/ReaderInterface.hpp" #include "ArchiveWriter.hpp" #include "CommandLineArguments.hpp" #include "DictionaryWriter.hpp" @@ -169,6 +170,16 @@ class JsonParser { */ int32_t add_metadata_field(std::string_view const field_name, NodeType type); + /** + * Checks if a reader interface is a clp::NetworkReader that has encountered a CURL error and + * logs relevant CURL error information if a CURL error has occurred. + * @param path + * @param reader + * @return true if the provided ReaderInterface has experienced a CURL error and false otherwise + */ + static bool + check_and_log_curl_error(Path const& path, std::shared_ptr reader); + int m_num_messages; std::vector m_input_paths; NetworkAuthOption m_network_auth{}; diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 21c1b84b3..4d1080bdc 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -33,7 +33,6 @@ #include "search/Projection.hpp" #include "search/SchemaMatch.hpp" #include "TimestampPattern.hpp" -#include "TraceableException.hpp" #include "Utils.hpp" using namespace clp_s::search; @@ -209,7 +208,7 @@ bool search_archive( } projection->add_column(ColumnDescriptor::create_from_escaped_tokens(descriptor_tokens)); } - } catch (clp_s::TraceableException& e) { + } catch (std::exception const& e) { SPDLOG_ERROR("{}", e.what()); return false; } @@ -253,7 +252,7 @@ bool search_archive( SPDLOG_ERROR("Unhandled OutputHandlerType."); return false; } - } catch (clp_s::TraceableException& e) { + } catch (std::exception const& e) { SPDLOG_ERROR("Failed to create output handler - {}", e.what()); return false; } @@ -324,8 +323,8 @@ int main(int argc, char const* argv[]) { option.archive_path = archive_path; decompress_archive(option); } - } catch (clp_s::TraceableException& e) { - SPDLOG_ERROR("{}", e.what()); + } catch (std::exception const& e) { + SPDLOG_ERROR("Encountered error during decompression - {}", e.what()); return 1; } } else {