diff --git a/src/etlng/CMakeLists.txt b/src/etlng/CMakeLists.txt index f54f2dd39..5d9e29158 100644 --- a/src/etlng/CMakeLists.txt +++ b/src/etlng/CMakeLists.txt @@ -1,5 +1,5 @@ add_library(clio_etlng) -target_sources(clio_etlng PRIVATE impl/Extraction.cpp) +target_sources(clio_etlng PRIVATE impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp) target_link_libraries(clio_etlng PUBLIC clio_data) diff --git a/src/etlng/InitialLoadObserverInterface.hpp b/src/etlng/InitialLoadObserverInterface.hpp new file mode 100644 index 000000000..1aa164b7c --- /dev/null +++ b/src/etlng/InitialLoadObserverInterface.hpp @@ -0,0 +1,54 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etlng/Models.hpp" + +#include + +#include +#include +#include +#include + +namespace etlng { + +/** + * @brief The interface for observing the initial ledger load + */ +struct InitialLoadObserverInterface { + virtual ~InitialLoadObserverInterface() = default; + + /** + * @brief Callback for each incoming batch of objects during initial ledger load + * + * @param seq The sequence for this batch of objects + * @param data The batch of objects + * @param lastKey The last key of the previous batch if there was one + */ + virtual void + onInitialLoadGotMoreObjects( + uint32_t seq, + std::vector const& data, + std::optional lastKey = std::nullopt + ) = 0; +}; + +} // namespace etlng diff --git a/src/etlng/impl/AsyncGrpcCall.cpp b/src/etlng/impl/AsyncGrpcCall.cpp new file mode 100644 index 000000000..6fd44dced --- /dev/null +++ b/src/etlng/impl/AsyncGrpcCall.cpp @@ -0,0 +1,188 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/impl/AsyncGrpcCall.hpp" + +#include "etl/ETLHelpers.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/Models.hpp" +#include "etlng/impl/Extraction.hpp" +#include "util/Assert.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace etlng::impl { + +AsyncGrpcCall::AsyncGrpcCall( + uint32_t seq, + ripple::uint256 const& marker, + std::optional const& nextMarker +) +{ + request_.set_user("ETL"); + request_.mutable_ledger()->set_sequence(seq); + + if (marker.isNonZero()) + request_.set_marker(marker.data(), ripple::uint256::size()); + + nextPrefix_ = nextMarker ? nextMarker->data()[0] : 0x00; + auto const prefix = marker.data()[0]; + + LOG(log_.debug()) << "Setting up AsyncGrpcCall. marker = " << ripple::strHex(marker) + << ". prefix = " << ripple::strHex(std::string(1, prefix)) + << ". nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_)); + + ASSERT( + nextPrefix_ > prefix or nextPrefix_ == 0x00, + "Next prefix must be greater than current prefix. Got: nextPrefix_ = {}, prefix = {}", + nextPrefix_, + prefix + ); + + cur_ = std::make_unique(); + next_ = std::make_unique(); + context_ = std::make_unique(); +} + +AsyncGrpcCall::CallStatus +AsyncGrpcCall::process( + std::unique_ptr& stub, + grpc::CompletionQueue& cq, + etlng::InitialLoadObserverInterface& loader, + bool abort +) +{ + LOG(log_.trace()) << "Processing response. " + << "Marker prefix = " << getMarkerPrefix(); + + if (abort) { + LOG(log_.error()) << "AsyncGrpcCall aborted"; + return CallStatus::ERRORED; + } + + if (!status_.ok()) { + LOG(log_.error()) << "AsyncGrpcCall status_ not ok: code = " << status_.error_code() + << " message = " << status_.error_message(); + + return CallStatus::ERRORED; + } + + if (!next_->is_unlimited()) { + LOG(log_.warn()) << "AsyncGrpcCall is_unlimited is false. " + << "Make sure secure_gateway is set correctly at the ETL source"; + } + + std::swap(cur_, next_); + auto more = true; + + // if no marker returned, we are done + if (cur_->marker().empty()) + more = false; + + // if returned marker is greater than our end, we are done + auto const prefix = cur_->marker()[0]; + if (nextPrefix_ != 0x00 && prefix >= nextPrefix_) + more = false; + + // if we are not done, make the next async call + if (more) { + request_.set_marker(cur_->marker()); + call(stub, cq); + } + + auto const numObjects = cur_->ledger_objects().objects_size(); + std::vector data; + data.reserve(numObjects); + + for (int i = 0; i < numObjects; ++i) { + auto obj = std::move(*(cur_->mutable_ledger_objects()->mutable_objects(i))); + if (!more && nextPrefix_ != 0x00) { + if (static_cast(obj.key()[0]) >= nextPrefix_) + continue; + } + + lastKey_ = obj.key(); // this will end up the last key we actually touched eventually + data.push_back(etlng::impl::extractObj(std::move(obj))); + } + + if (not data.empty()) + loader.onInitialLoadGotMoreObjects(request_.ledger().sequence(), data, predecessorKey_); + + predecessorKey_ = lastKey_; // but for ongoing onInitialObjects calls we need to pass along the key we left + // off at so that we can link the two lists correctly + + return more ? CallStatus::MORE : CallStatus::DONE; +} + +void +AsyncGrpcCall::call(std::unique_ptr& stub, grpc::CompletionQueue& cq) +{ + context_ = std::make_unique(); + auto rpc = stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq); + + rpc->StartCall(); + rpc->Finish(next_.get(), &status_, this); +} + +std::string +AsyncGrpcCall::getMarkerPrefix() +{ + return next_->marker().empty() ? std::string{} : ripple::strHex(std::string{next_->marker().data()[0]}); +} + +// this is used to generate edgeKeys - keys that were the last one in the onInitialObjects list +// then we write them all in one go getting the successor from the cache once it's full +std::string +AsyncGrpcCall::getLastKey() +{ + return lastKey_; +} + +std::vector +AsyncGrpcCall::makeAsyncCalls(uint32_t const sequence, uint32_t const numMarkers) +{ + auto const markers = etl::getMarkers(numMarkers); + + std::vector result; + result.reserve(markers.size()); + + for (size_t i = 0; i + 1 < markers.size(); ++i) + result.emplace_back(sequence, markers[i], markers[i + 1]); + + if (not markers.empty()) + result.emplace_back(sequence, markers.back(), std::nullopt); + + return result; +} + +} // namespace etlng::impl diff --git a/src/etlng/impl/AsyncGrpcCall.hpp b/src/etlng/impl/AsyncGrpcCall.hpp new file mode 100644 index 000000000..8cb3edb84 --- /dev/null +++ b/src/etlng/impl/AsyncGrpcCall.hpp @@ -0,0 +1,85 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etlng/InitialLoadObserverInterface.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace etlng::impl { + +class AsyncGrpcCall { +public: + enum class CallStatus { MORE, DONE, ERRORED }; + using RequestType = org::xrpl::rpc::v1::GetLedgerDataRequest; + using ResponseType = org::xrpl::rpc::v1::GetLedgerDataResponse; + using StubType = org::xrpl::rpc::v1::XRPLedgerAPIService::Stub; + +private: + util::Logger log_{"ETL"}; + + std::unique_ptr cur_; + std::unique_ptr next_; + + RequestType request_; + std::unique_ptr context_; + + grpc::Status status_; + unsigned char nextPrefix_; + + std::string lastKey_; + std::optional predecessorKey_; + +public: + AsyncGrpcCall(uint32_t seq, ripple::uint256 const& marker, std::optional const& nextMarker); + + static std::vector + makeAsyncCalls(uint32_t const sequence, uint32_t const numMarkers); + + CallStatus + process( + std::unique_ptr& stub, + grpc::CompletionQueue& cq, + etlng::InitialLoadObserverInterface& loader, + bool abort + ); + + void + call(std::unique_ptr& stub, grpc::CompletionQueue& cq); + + std::string + getMarkerPrefix(); + + std::string + getLastKey(); +}; + +} // namespace etlng::impl diff --git a/src/etlng/impl/GrpcSource.cpp b/src/etlng/impl/GrpcSource.cpp new file mode 100644 index 000000000..fdb679436 --- /dev/null +++ b/src/etlng/impl/GrpcSource.cpp @@ -0,0 +1,168 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/impl/GrpcSource.hpp" + +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/impl/AsyncGrpcCall.hpp" +#include "util/Assert.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +std::string +toString(auto const& endpoint) +{ + std::stringstream ss; + ss << endpoint; + return ss.str(); +} + +std::string +resolve(boost::asio::io_context& ctx, std::string const& ip, std::string const& port) +{ + boost::asio::ip::tcp::resolver resolver{ctx}; + + if (auto const result = resolver.resolve(ip, port); not result.empty()) + return toString(result.begin()->endpoint()); + + throw std::runtime_error("Failed to resolve " + ip + ":" + port); +} + +} // namespace + +namespace etlng::impl { + +GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort) + : log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)) +{ + try { + boost::asio::io_context ctx; + grpc::ChannelArguments chArgs; + chArgs.SetMaxReceiveMessageSize(-1); + + stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub( + grpc::CreateCustomChannel(resolve(ctx, ip, grpcPort), grpc::InsecureChannelCredentials(), chArgs) + ); + + LOG(log_.debug()) << "Made stub for remote."; + } catch (std::exception const& e) { + LOG(log_.warn()) << "Exception while creating stub: " << e.what() << "."; + } +} + +std::pair +GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighbors) +{ + org::xrpl::rpc::v1::GetLedgerResponse response; + if (!stub_) + return {{grpc::StatusCode::INTERNAL, "No Stub"}, response}; + + // Ledger header with txns and metadata + org::xrpl::rpc::v1::GetLedgerRequest request; + grpc::ClientContext context; + + request.mutable_ledger()->set_sequence(sequence); + request.set_transactions(true); + request.set_expand(true); + request.set_get_objects(getObjects); + request.set_get_object_neighbors(getObjectNeighbors); + request.set_user("ETL"); + + grpc::Status const status = stub_->GetLedger(&context, request, &response); + + if (status.ok() and not response.is_unlimited()) { + log_.warn() << "is_unlimited is false. Make sure secure_gateway is set correctly on the ETL source. Status = " + << status.error_message(); + } + + return {status, std::move(response)}; +} + +std::pair, bool> +GrpcSource::loadInitialLedger( + uint32_t const sequence, + uint32_t const numMarkers, + etlng::InitialLoadObserverInterface& observer +) +{ + if (!stub_) + return {{}, false}; + + std::vector calls = AsyncGrpcCall::makeAsyncCalls(sequence, numMarkers); + + LOG(log_.debug()) << "Starting data download for ledger " << sequence << "."; + + grpc::CompletionQueue queue; + for (auto& call : calls) + call.call(stub_, queue); + + std::vector edgeKeys; + void* tag = nullptr; + bool ok = false; + bool abort = false; + size_t numFinished = 0; + + while (numFinished < calls.size() && queue.Next(&tag, &ok)) { + ASSERT(tag != nullptr, "Tag can't be null."); + auto ptr = static_cast(tag); + + if (!ok) { + LOG(log_.error()) << "loadInitialLedger - ok is false"; + return {{}, false}; // cancelled + } + + LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix(); + + auto result = ptr->process(stub_, queue, observer, abort); + if (result != AsyncGrpcCall::CallStatus::MORE) { + ++numFinished; + LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished; + + if (auto lastKey = ptr->getLastKey(); !lastKey.empty()) + edgeKeys.push_back(std::move(lastKey)); + } + + if (result == AsyncGrpcCall::CallStatus::ERRORED) + abort = true; + } + + return {std::move(edgeKeys), !abort}; +} + +} // namespace etlng::impl diff --git a/src/etlng/impl/GrpcSource.hpp b/src/etlng/impl/GrpcSource.hpp new file mode 100644 index 000000000..0111f3330 --- /dev/null +++ b/src/etlng/impl/GrpcSource.hpp @@ -0,0 +1,70 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etlng/InitialLoadObserverInterface.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace etlng::impl { + +class GrpcSource { + util::Logger log_; + std::unique_ptr stub_; + +public: + GrpcSource(std::string const& ip, std::string const& grpcPort); + + /** + * @brief Fetch data for a specific ledger. + * + * This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger + * is found in the database, or the server is shutting down. + * + * @param sequence Sequence of the ledger to fetch + * @param getObjects Whether to get the account state diff between this ledger and the prior one; defaults to true + * @param getObjectNeighbors Whether to request object neighbors; defaults to false + * @return A std::pair of the response status and the response itself + */ + std::pair + fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false); + + /** + * @brief Download a ledger in full. + * + * @param sequence Sequence of the ledger to download + * @param numMarkers Number of markers to generate for async calls + * @param observer InitialLoadObserverInterface implementation + * @return A std::pair of the data and a bool indicating whether the download was successful + */ + std::pair, bool> + loadInitialLedger(uint32_t sequence, uint32_t numMarkers, etlng::InitialLoadObserverInterface& observer); +}; + +} // namespace etlng::impl diff --git a/src/rpc/common/Validators.cpp b/src/rpc/common/Validators.cpp index 9952a1bd5..f7769d7e6 100644 --- a/src/rpc/common/Validators.cpp +++ b/src/rpc/common/Validators.cpp @@ -305,7 +305,7 @@ CustomValidator CustomValidators::AuthorizeCredentialValidator = return Error{ Status{ClioError::rpcMALFORMED_AUTHORIZED_CREDENTIALS, "Field 'Issuer' is required but missing."} }; -} + } // don't want to change issuer error message to be about credentials if (!IssuerValidator.verify(credObj, "issuer")) diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 501b251cf..baea86b9e 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -32,8 +32,9 @@ target_sources( etl/SubscriptionSourceTests.cpp etl/TransformerTests.cpp # ETLng - etlng/RegistryTests.cpp etlng/ExtractionTests.cpp + etlng/GrpcSourceTests.cpp + etlng/RegistryTests.cpp # Feed feed/BookChangesFeedTests.cpp feed/ForwardFeedTests.cpp diff --git a/tests/unit/etlng/GrpcSourceTests.cpp b/tests/unit/etlng/GrpcSourceTests.cpp new file mode 100644 index 000000000..ec0fa77d2 --- /dev/null +++ b/tests/unit/etlng/GrpcSourceTests.cpp @@ -0,0 +1,285 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etl/ETLHelpers.hpp" +#include "etl/impl/GrpcSource.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/Models.hpp" +#include "etlng/impl/GrpcSource.hpp" +#include "util/Assert.hpp" +#include "util/LoggerFixtures.hpp" +#include "util/MockPrometheus.hpp" +#include "util/MockXrpLedgerAPIService.hpp" +#include "util/TestObject.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace etlng::model; + +namespace { + +struct MockLoadObserver : etlng::InitialLoadObserverInterface { + MOCK_METHOD( + void, + onInitialLoadGotMoreObjects, + (uint32_t, std::vector const&, std::optional), + (override) + ); +}; + +struct GrpcSourceNgTests : NoLoggerFixture, util::prometheus::WithPrometheus, tests::util::WithMockXrpLedgerAPIService { + GrpcSourceNgTests() + : WithMockXrpLedgerAPIService("localhost:0"), grpcSource_("localhost", std::to_string(getXRPLMockPort())) + { + } + + testing::StrictMock loader_; + testing::StrictMock grpcSource_; +}; + +class KeyStore { + std::vector keys_; + std::map, std::greater<>> store_; + + std::mutex mtx_; + +public: + KeyStore(std::size_t totalKeys, std::size_t numMarkers) : keys_(etl::getMarkers(totalKeys)) + { + auto const totalPerMarker = totalKeys / numMarkers; + auto const markers = etl::getMarkers(numMarkers); + for (auto mi = 0uz; mi < markers.size(); ++mi) { + for (auto i = 0uz; i < totalPerMarker; ++i) { + auto const mapKey = ripple::strHex(markers.at(mi)).substr(0, 2); + store_[mapKey].push(keys_.at(mi * totalPerMarker + i)); + } + } + } + + std::optional + next(std::string const& marker) + { + std::scoped_lock lock(mtx_); + + auto const mapKey = ripple::strHex(marker).substr(0, 2); + auto k = store_.lower_bound(mapKey); + ASSERT(k != store_.end(), "Lower bound not found for '{}'", mapKey); + + auto& q = k->second; + if (q.empty()) + return std::nullopt; + + auto t = q.front(); + q.pop(); + + return std::make_optional(reinterpret_cast(t.data()), ripple::uint256::size()); + }; + + std::optional + peek(std::string const& marker) + { + std::scoped_lock lock(mtx_); + + auto const mapKey = ripple::strHex(marker).substr(0, 2); + auto k = store_.lower_bound(mapKey); + ASSERT(k != store_.end(), "Lower bound not found for '{}'", mapKey); + + auto& q = k->second; + if (q.empty()) + return std::nullopt; + + auto t = q.front(); + return std::make_optional(reinterpret_cast(t.data()), ripple::uint256::size()); + }; +}; + +} // namespace + +TEST_F(GrpcSourceNgTests, BasicFetchLedger) +{ + uint32_t const sequence = 123; + bool const getObjects = true; + bool const getObjectNeighbors = false; + + EXPECT_CALL(mockXrpLedgerAPIService, GetLedger) + .WillOnce([&](grpc::ServerContext* /*context*/, + org::xrpl::rpc::v1::GetLedgerRequest const* request, + org::xrpl::rpc::v1::GetLedgerResponse* response) { + EXPECT_EQ(request->ledger().sequence(), sequence); + EXPECT_TRUE(request->transactions()); + EXPECT_TRUE(request->expand()); + EXPECT_EQ(request->get_objects(), getObjects); + EXPECT_EQ(request->get_object_neighbors(), getObjectNeighbors); + EXPECT_EQ(request->user(), "ETL"); + response->set_validated(true); + response->set_is_unlimited(false); + response->set_object_neighbors_included(false); + return grpc::Status{}; + }); + auto const [status, response] = grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors); + ASSERT_TRUE(status.ok()); + EXPECT_TRUE(response.validated()); + EXPECT_FALSE(response.is_unlimited()); + EXPECT_FALSE(response.object_neighbors_included()); +} + +struct GrpcSourceLoadInitialLedgerTests : GrpcSourceNgTests { + uint32_t const sequence_ = 123; + uint32_t const numMarkers_ = 4; + bool const cacheOnly_ = false; +}; + +TEST_F(GrpcSourceLoadInitialLedgerTests, GetLedgerDataNotFound) +{ + EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData) + .Times(numMarkers_) + .WillRepeatedly([&](grpc::ServerContext* /*context*/, + org::xrpl::rpc::v1::GetLedgerDataRequest const* request, + org::xrpl::rpc::v1::GetLedgerDataResponse* /*response*/) { + EXPECT_EQ(request->ledger().sequence(), sequence_); + EXPECT_EQ(request->user(), "ETL"); + return grpc::Status{grpc::StatusCode::NOT_FOUND, "Not found"}; + }); + + auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, loader_); + EXPECT_TRUE(data.empty()); + EXPECT_FALSE(success); +} + +TEST_F(GrpcSourceLoadInitialLedgerTests, LoaderCalledCorrectly) +{ + auto const key = ripple::uint256{4}; + std::string const keyStr{reinterpret_cast(key.data()), ripple::uint256::size()}; + auto const object = CreateTicketLedgerObject("rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", sequence_); + auto const objectData = object.getSerializer().peekData(); + + EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData) + .Times(numMarkers_) + .WillRepeatedly([&](grpc::ServerContext* /*context*/, + org::xrpl::rpc::v1::GetLedgerDataRequest const* request, + org::xrpl::rpc::v1::GetLedgerDataResponse* response) { + EXPECT_EQ(request->ledger().sequence(), sequence_); + EXPECT_EQ(request->user(), "ETL"); + + response->set_is_unlimited(true); + auto newObject = response->mutable_ledger_objects()->add_objects(); + newObject->set_key(reinterpret_cast(key.data()), ripple::uint256::size()); + newObject->set_data(objectData.data(), objectData.size()); + + return grpc::Status{}; + }); + + EXPECT_CALL(loader_, onInitialLoadGotMoreObjects) + .Times(numMarkers_) + .WillRepeatedly([&](uint32_t, std::vector const& data, std::optional lastKey) { + EXPECT_FALSE(lastKey.has_value()); + EXPECT_EQ(data.size(), 1); + }); + + auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, loader_); + + EXPECT_TRUE(success); + EXPECT_EQ(data.size(), numMarkers_); + + EXPECT_EQ(data, std::vector(4, keyStr)); +} + +TEST_F(GrpcSourceLoadInitialLedgerTests, DataTransferredAndLoaderCalledCorrectly) +{ + auto const totalKeys = 256uz; + auto const totalPerMarker = totalKeys / numMarkers_; + auto const batchSize = totalPerMarker / 4uz; + auto const batchesPerMarker = totalPerMarker / batchSize; + + auto keyStore = KeyStore(totalKeys, numMarkers_); + + auto const object = CreateTicketLedgerObject("rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", sequence_); + auto const objectData = object.getSerializer().peekData(); + + EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData) + .Times(numMarkers_ * batchesPerMarker) + .WillRepeatedly([&](grpc::ServerContext* /*context*/, + org::xrpl::rpc::v1::GetLedgerDataRequest const* request, + org::xrpl::rpc::v1::GetLedgerDataResponse* response) { + EXPECT_EQ(request->ledger().sequence(), sequence_); + EXPECT_EQ(request->user(), "ETL"); + + response->set_is_unlimited(true); + + auto next = request->marker().empty() ? std::string("00") : request->marker(); + for (auto i = 0uz; i < batchSize; ++i) { + if (auto maybeLastKey = keyStore.next(next); maybeLastKey.has_value()) { + next = *maybeLastKey; + + auto newObject = response->mutable_ledger_objects()->add_objects(); + newObject->set_key(next); + newObject->set_data(objectData.data(), objectData.size()); + } + } + + if (auto maybeNext = keyStore.peek(next); maybeNext.has_value()) + response->set_marker(*maybeNext); // hack + + return grpc::Status::OK; + }); + + std::atomic_int total = 0; + testing::InSequence seqGuard; + + EXPECT_CALL(loader_, onInitialLoadGotMoreObjects) + .Times(numMarkers_) + .WillRepeatedly([&](uint32_t, std::vector const& data, std::optional lastKey) { + EXPECT_LE(data.size(), batchSize); + EXPECT_FALSE(lastKey.has_value()); + total += data.size(); + }); + + EXPECT_CALL(loader_, onInitialLoadGotMoreObjects) + .Times((numMarkers_ - 1) * batchesPerMarker) + .WillRepeatedly([&](uint32_t, std::vector const& data, std::optional lastKey) { + EXPECT_LE(data.size(), batchSize); + EXPECT_TRUE(lastKey.has_value()); + total += data.size(); + }); + + auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, loader_); + + EXPECT_TRUE(success); + EXPECT_EQ(data.size(), numMarkers_); + EXPECT_EQ(total, totalKeys); +}