-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement GrpcSource and underlying AsyncGrpcCall
- Loading branch information
Showing
9 changed files
with
854 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
add_library(clio_etlng) | ||
|
||
target_sources(clio_etlng PRIVATE impl/Extraction.cpp) | ||
target_sources(clio_etlng PRIVATE impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp) | ||
|
||
target_link_libraries(clio_etlng PUBLIC clio_data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
//------------------------------------------------------------------------------ | ||
/* | ||
This file is part of clio: https://github.com/XRPLF/clio | ||
Copyright (c) 2024, the clio developers. | ||
Permission to use, copy, modify, and distribute this software for any | ||
purpose with or without fee is hereby granted, provided that the above | ||
copyright notice and this permission notice appear in all copies. | ||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
*/ | ||
//============================================================================== | ||
|
||
#pragma once | ||
|
||
#include "etlng/Models.hpp" | ||
|
||
#include <xrpl/protocol/LedgerHeader.h> | ||
|
||
#include <cstdint> | ||
#include <optional> | ||
#include <string> | ||
#include <vector> | ||
|
||
namespace etlng { | ||
|
||
/** | ||
* @brief The interface for observing the initial ledger load | ||
*/ | ||
struct InitialLoadObserverInterface { | ||
virtual ~InitialLoadObserverInterface() = default; | ||
|
||
/** | ||
* @brief Callback for each incoming batch of objects during initial ledger load | ||
* | ||
* @param seq The sequence for this batch of objects | ||
* @param data The batch of objects | ||
* @param lastKey The last key of the previous batch if there was one | ||
*/ | ||
virtual void | ||
onInitialLoadGotMoreObjects( | ||
uint32_t seq, | ||
std::vector<model::Object> const& data, | ||
std::optional<std::string> lastKey = std::nullopt | ||
) = 0; | ||
}; | ||
|
||
} // namespace etlng |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
//------------------------------------------------------------------------------ | ||
/* | ||
This file is part of clio: https://github.com/XRPLF/clio | ||
Copyright (c) 2024, the clio developers. | ||
Permission to use, copy, modify, and distribute this software for any | ||
purpose with or without fee is hereby granted, provided that the above | ||
copyright notice and this permission notice appear in all copies. | ||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
*/ | ||
//============================================================================== | ||
|
||
#include "etlng/impl/AsyncGrpcCall.hpp" | ||
|
||
#include "etl/ETLHelpers.hpp" | ||
#include "etlng/InitialLoadObserverInterface.hpp" | ||
#include "etlng/Models.hpp" | ||
#include "etlng/impl/Extraction.hpp" | ||
#include "util/Assert.hpp" | ||
#include "util/log/Logger.hpp" | ||
|
||
#include <grpcpp/client_context.h> | ||
#include <grpcpp/grpcpp.h> | ||
#include <grpcpp/support/status.h> | ||
#include <xrpl/basics/base_uint.h> | ||
#include <xrpl/basics/strHex.h> | ||
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h> | ||
|
||
#include <cstddef> | ||
#include <cstdint> | ||
#include <memory> | ||
#include <optional> | ||
#include <string> | ||
#include <utility> | ||
#include <vector> | ||
|
||
namespace etlng::impl { | ||
|
||
AsyncGrpcCall::AsyncGrpcCall( | ||
uint32_t seq, | ||
ripple::uint256 const& marker, | ||
std::optional<ripple::uint256> const& nextMarker | ||
) | ||
{ | ||
request_.set_user("ETL"); | ||
request_.mutable_ledger()->set_sequence(seq); | ||
|
||
if (marker.isNonZero()) | ||
request_.set_marker(marker.data(), ripple::uint256::size()); | ||
|
||
nextPrefix_ = nextMarker ? nextMarker->data()[0] : 0x00; | ||
auto const prefix = marker.data()[0]; | ||
|
||
LOG(log_.debug()) << "Setting up AsyncGrpcCall. marker = " << ripple::strHex(marker) | ||
<< ". prefix = " << ripple::strHex(std::string(1, prefix)) | ||
<< ". nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_)); | ||
|
||
ASSERT( | ||
nextPrefix_ > prefix or nextPrefix_ == 0x00, | ||
"Next prefix must be greater than current prefix. Got: nextPrefix_ = {}, prefix = {}", | ||
nextPrefix_, | ||
prefix | ||
); | ||
|
||
cur_ = std::make_unique<ResponseType>(); | ||
next_ = std::make_unique<ResponseType>(); | ||
context_ = std::make_unique<grpc::ClientContext>(); | ||
} | ||
|
||
AsyncGrpcCall::CallStatus | ||
AsyncGrpcCall::process( | ||
std::unique_ptr<AsyncGrpcCall::StubType>& stub, | ||
grpc::CompletionQueue& cq, | ||
etlng::InitialLoadObserverInterface& loader, | ||
bool abort | ||
) | ||
{ | ||
LOG(log_.trace()) << "Processing response. " | ||
<< "Marker prefix = " << getMarkerPrefix(); | ||
|
||
if (abort) { | ||
LOG(log_.error()) << "AsyncGrpcCall aborted"; | ||
return CallStatus::ERRORED; | ||
} | ||
|
||
if (!status_.ok()) { | ||
LOG(log_.error()) << "AsyncGrpcCall status_ not ok: code = " << status_.error_code() | ||
<< " message = " << status_.error_message(); | ||
|
||
return CallStatus::ERRORED; | ||
} | ||
|
||
if (!next_->is_unlimited()) { | ||
LOG(log_.warn()) << "AsyncGrpcCall is_unlimited is false. " | ||
<< "Make sure secure_gateway is set correctly at the ETL source"; | ||
} | ||
|
||
std::swap(cur_, next_); | ||
auto more = true; | ||
|
||
// if no marker returned, we are done | ||
if (cur_->marker().empty()) | ||
more = false; | ||
|
||
// if returned marker is greater than our end, we are done | ||
auto const prefix = cur_->marker()[0]; | ||
if (nextPrefix_ != 0x00 && prefix >= nextPrefix_) | ||
more = false; | ||
|
||
// if we are not done, make the next async call | ||
if (more) { | ||
request_.set_marker(cur_->marker()); | ||
call(stub, cq); | ||
} | ||
|
||
auto const numObjects = cur_->ledger_objects().objects_size(); | ||
std::vector<etlng::model::Object> data; | ||
data.reserve(numObjects); | ||
|
||
for (int i = 0; i < numObjects; ++i) { | ||
auto obj = std::move(*(cur_->mutable_ledger_objects()->mutable_objects(i))); | ||
if (!more && nextPrefix_ != 0x00) { | ||
if (static_cast<unsigned char>(obj.key()[0]) >= nextPrefix_) | ||
continue; | ||
} | ||
|
||
lastKey_ = obj.key(); // this will end up the last key we actually touched eventually | ||
data.push_back(etlng::impl::extractObj(std::move(obj))); | ||
} | ||
|
||
if (not data.empty()) | ||
loader.onInitialLoadGotMoreObjects(request_.ledger().sequence(), data, predecessorKey_); | ||
|
||
predecessorKey_ = lastKey_; // but for ongoing onInitialObjects calls we need to pass along the key we left | ||
// off at so that we can link the two lists correctly | ||
|
||
return more ? CallStatus::MORE : CallStatus::DONE; | ||
} | ||
|
||
void | ||
AsyncGrpcCall::call(std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub, grpc::CompletionQueue& cq) | ||
{ | ||
context_ = std::make_unique<grpc::ClientContext>(); | ||
auto rpc = stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq); | ||
|
||
rpc->StartCall(); | ||
rpc->Finish(next_.get(), &status_, this); | ||
} | ||
|
||
std::string | ||
AsyncGrpcCall::getMarkerPrefix() | ||
{ | ||
return next_->marker().empty() ? std::string{} : ripple::strHex(std::string{next_->marker().data()[0]}); | ||
} | ||
|
||
// this is used to generate edgeKeys - keys that were the last one in the onInitialObjects list | ||
// then we write them all in one go getting the successor from the cache once it's full | ||
std::string | ||
AsyncGrpcCall::getLastKey() | ||
{ | ||
return lastKey_; | ||
} | ||
|
||
std::vector<AsyncGrpcCall> | ||
AsyncGrpcCall::makeAsyncCalls(uint32_t const sequence, uint32_t const numMarkers) | ||
{ | ||
auto const markers = etl::getMarkers(numMarkers); | ||
|
||
std::vector<AsyncGrpcCall> result; | ||
result.reserve(markers.size()); | ||
|
||
for (size_t i = 0; i + 1 < markers.size(); ++i) | ||
result.emplace_back(sequence, markers[i], markers[i + 1]); | ||
|
||
if (not markers.empty()) | ||
result.emplace_back(sequence, markers.back(), std::nullopt); | ||
|
||
return result; | ||
} | ||
|
||
} // namespace etlng::impl |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
//------------------------------------------------------------------------------ | ||
/* | ||
This file is part of clio: https://github.com/XRPLF/clio | ||
Copyright (c) 2024, the clio developers. | ||
Permission to use, copy, modify, and distribute this software for any | ||
purpose with or without fee is hereby granted, provided that the above | ||
copyright notice and this permission notice appear in all copies. | ||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
*/ | ||
//============================================================================== | ||
|
||
#pragma once | ||
|
||
#include "etlng/InitialLoadObserverInterface.hpp" | ||
#include "util/log/Logger.hpp" | ||
|
||
#include <grpcpp/client_context.h> | ||
#include <grpcpp/support/status.h> | ||
#include <xrpl/basics/base_uint.h> | ||
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger_data.pb.h> | ||
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h> | ||
|
||
#include <cstdint> | ||
#include <memory> | ||
#include <optional> | ||
#include <string> | ||
#include <vector> | ||
|
||
namespace etlng::impl { | ||
|
||
class AsyncGrpcCall { | ||
public: | ||
enum class CallStatus { MORE, DONE, ERRORED }; | ||
using RequestType = org::xrpl::rpc::v1::GetLedgerDataRequest; | ||
using ResponseType = org::xrpl::rpc::v1::GetLedgerDataResponse; | ||
using StubType = org::xrpl::rpc::v1::XRPLedgerAPIService::Stub; | ||
|
||
private: | ||
util::Logger log_{"ETL"}; | ||
|
||
std::unique_ptr<ResponseType> cur_; | ||
std::unique_ptr<ResponseType> next_; | ||
|
||
RequestType request_; | ||
std::unique_ptr<grpc::ClientContext> context_; | ||
|
||
grpc::Status status_; | ||
unsigned char nextPrefix_; | ||
|
||
std::string lastKey_; | ||
std::optional<std::string> predecessorKey_; | ||
|
||
public: | ||
AsyncGrpcCall(uint32_t seq, ripple::uint256 const& marker, std::optional<ripple::uint256> const& nextMarker); | ||
|
||
static std::vector<AsyncGrpcCall> | ||
makeAsyncCalls(uint32_t const sequence, uint32_t const numMarkers); | ||
|
||
CallStatus | ||
process( | ||
std::unique_ptr<StubType>& stub, | ||
grpc::CompletionQueue& cq, | ||
etlng::InitialLoadObserverInterface& loader, | ||
bool abort | ||
); | ||
|
||
void | ||
call(std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub, grpc::CompletionQueue& cq); | ||
|
||
std::string | ||
getMarkerPrefix(); | ||
|
||
std::string | ||
getLastKey(); | ||
}; | ||
|
||
} // namespace etlng::impl |
Oops, something went wrong.