Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tonNode.getOutMsgQueueProof query in public shard overlays #1413

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions create-hardfork/create-hardfork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ class HardforkCreator : public td::actor::Actor {
void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override {
}
void download_out_msg_queue_proof(
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
}

void new_key_block(ton::validator::BlockHandle handle) override {
}
Expand Down
6 changes: 6 additions & 0 deletions crypto/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,12 @@ bool EnqueuedMsgDescr::check_key(td::ConstBitPtr key) const {
hash_ == key + 96;
}

bool ImportedMsgQueueLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xd3 // imported_msg_queue_limits#d3
&& cs.fetch_uint_to(32, max_bytes) // max_bytes:#
&& cs.fetch_uint_to(32, max_msgs); // max_msgs:#
}

bool ParamLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xc3 // param_limits#c3
&& cs.fetch_uint_to(32, limits_[0]) // underload:uint32
Expand Down
10 changes: 10 additions & 0 deletions crypto/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,16 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC
return proc_coll.print(os);
}

struct ImportedMsgQueueLimits {
// Default values
td::uint32 max_bytes = 1 << 16;
td::uint32 max_msgs = 30;
bool deserialize(vm::CellSlice& cs);
ImportedMsgQueueLimits operator*(td::uint32 x) const {
return {max_bytes * x, max_msgs * x};
}
};

struct ParamLimits {
enum { limits_cnt = 4 };
enum { cl_underload = 0, cl_normal = 1, cl_soft = 2, cl_medium = 3, cl_hard = 4 };
Expand Down
15 changes: 15 additions & 0 deletions tdutils/td/utils/StringBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,19 @@ std::enable_if_t<std::is_arithmetic<T>::value, string> to_string(const T &x) {
return sb.as_cslice().str();
}

template <class SB>
struct LambdaPrintHelper {
SB& sb;
};
template <class SB, class F>
SB& operator<<(const LambdaPrintHelper<SB>& helper, F&& f) {
f(helper.sb);
return helper.sb;
}
struct LambdaPrint {};

inline LambdaPrintHelper<td::StringBuilder> operator<<(td::StringBuilder& sb, const LambdaPrint&) {
return LambdaPrintHelper<td::StringBuilder>{sb};
}

} // namespace td
10 changes: 7 additions & 3 deletions tdutils/td/utils/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

#define LOG(level) LOG_IMPL(level, level, true, ::td::Slice())
#define LOG_IF(level, condition) LOG_IMPL(level, level, condition, #condition)
#define FLOG(level) LOG_IMPL(level, level, true, ::td::Slice()) << td::LambdaPrint{} << [&](auto &sb)

#define VLOG(level) LOG_IMPL(DEBUG, level, true, TD_DEFINE_STR(level))
#define VLOG_IF(level, condition) LOG_IMPL(DEBUG, level, condition, TD_DEFINE_STR(level) " " #condition)
Expand All @@ -95,13 +96,13 @@ inline bool no_return_func() {
#define DUMMY_LOG_CHECK(condition) LOG_IF(NEVER, !(condition))

#ifdef TD_DEBUG
#if TD_MSVC
#if TD_MSVC
#define LOG_CHECK(condition) \
__analysis_assume(!!(condition)); \
LOG_IMPL(FATAL, FATAL, !(condition), #condition)
#else
#else
#define LOG_CHECK(condition) LOG_IMPL(FATAL, FATAL, !(condition) && no_return_func(), #condition)
#endif
#endif
#else
#define LOG_CHECK DUMMY_LOG_CHECK
#endif
Expand Down Expand Up @@ -263,6 +264,9 @@ class Logger {
sb_ << other;
return *this;
}
LambdaPrintHelper<td::Logger> operator<<(const LambdaPrint &) {
return LambdaPrintHelper<td::Logger>{*this};
}

MutableCSlice as_cslice() {
return sb_.as_cslice();
Expand Down
4 changes: 4 additions & 0 deletions test/test-ton-collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ class TestNode : public td::actor::Actor {
void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override {
}
void download_out_msg_queue_proof(
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
}

void new_key_block(ton::validator::BlockHandle handle) override {
}
Expand Down
6 changes: 6 additions & 0 deletions tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ tonNode.success = tonNode.Success;
tonNode.archiveNotFound = tonNode.ArchiveInfo;
tonNode.archiveInfo id:long = tonNode.ArchiveInfo;

tonNode.importedMsgQueueLimits max_bytes:int max_msgs:int = ImportedMsgQueueLimits;
tonNode.outMsgQueueProof queue_proofs:bytes block_state_proofs:bytes msg_counts:(vector int) = tonNode.OutMsgQueueProof;
tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof;

tonNode.forgetPeer = tonNode.ForgetPeer;

---functions---
Expand Down Expand Up @@ -483,6 +487,8 @@ tonNode.downloadKeyBlockProofLink block:tonNode.blockIdExt = tonNode.Data;
tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo;
tonNode.getShardArchiveInfo masterchain_seqno:int shard_prefix:tonNode.shardId = tonNode.ArchiveInfo;
tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data;
tonNode.getOutMsgQueueProof dst_shard:tonNode.shardId blocks:(vector tonNode.blockIdExt)
limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof;

tonNode.getCapabilities = tonNode.Capabilities;

Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
1 change: 1 addition & 0 deletions validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(VALIDATOR_HEADERS
interfaces/db.h
interfaces/external-message.h
interfaces/liteserver.h
interfaces/out-msg-queue-proof.h
interfaces/proof.h
interfaces/shard.h
interfaces/signature-set.h
Expand Down
98 changes: 98 additions & 0 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "net/download-proof.hpp"
#include "net/get-next-key-blocks.hpp"
#include "net/download-archive-slice.hpp"
#include "impl/out-msg-queue-proof.hpp"

#include "td/utils/Random.h"

Expand Down Expand Up @@ -669,6 +670,62 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
query.offset_, query.max_size_, std::move(promise));
}

void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
td::Promise<td::BufferSlice> promise) {
std::vector<BlockIdExt> blocks;
for (const auto &x : query.blocks_) {
BlockIdExt id = create_block_id(x);
if (!id.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid block_id"));
return;
}
if (!shard_is_ancestor(shard_, id.shard_full())) {
promise.set_error(td::Status::Error("query in wrong overlay"));
return;
}
blocks.push_back(create_block_id(x));
}
ShardIdFull dst_shard = create_shard_id(query.dst_shard_);
if (!dst_shard.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid shard"));
return;
}
block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_};
if (limits.max_msgs > 512) {
promise.set_error(td::Status::Error("max_msgs is too big"));
return;
}
if (limits.max_bytes > (1 << 21)) {
promise.set_error(td::Status::Error("max_bytes is too big"));
return;
}
FLOG(DEBUG) {
sb << "Got query getOutMsgQueueProof to shard " << dst_shard.to_str() << " from blocks";
for (const BlockIdExt &id : blocks) {
sb << " " << id.id.to_str();
}
sb << " from " << src;
};
td::actor::send_closure(
full_node_, &FullNode::get_out_msg_queue_query_token,
[=, manager = validator_manager_, blocks = std::move(blocks),
promise = std::move(promise)](td::Result<std::unique_ptr<ActionToken>> R) mutable {
TRY_RESULT_PROMISE(promise, token, std::move(R));
auto P =
td::PromiseCreator::lambda([promise = std::move(promise), token = std::move(token)](
td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> R) mutable {
if (R.is_error()) {
promise.set_result(create_serialize_tl_object<ton_api::tonNode_outMsgQueueProofEmpty>());
} else {
promise.set_result(serialize_tl_object(R.move_as_ok(), true));
}
});
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", dst_shard, std::move(blocks), limits, manager,
std::move(P))
.release();
});
}

void FullNodeShardImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query,
td::Promise<td::BufferSlice> promise) {
if (!active_) {
Expand Down Expand Up @@ -944,6 +1001,47 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFu
.release();
}

void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
// TODO: maybe more complex download (like other requests here)
auto &b = choose_neighbour();
if (b.adnl_id == adnl::AdnlNodeIdShort::zero()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "no nodes"));
return;
}
std::vector<tl_object_ptr<ton_api::tonNode_blockIdExt>> blocks_tl;
for (const BlockIdExt &id : blocks) {
blocks_tl.push_back(create_tl_block_id(id));
}
td::BufferSlice query = create_serialize_tl_object<ton_api::tonNode_getOutMsgQueueProof>(
create_tl_shard_id(dst_shard), std::move(blocks_tl),
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(limits.max_bytes, limits.max_msgs));

auto P = td::PromiseCreator::lambda(
[=, promise = std::move(promise), blocks = std::move(blocks)](td::Result<td::BufferSlice> R) mutable {
if (R.is_error()) {
promise.set_result(R.move_as_error());
return;
}
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
ton_api::downcast_call(
*f, td::overloaded(
[&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
promise.set_error(td::Status::Error("node doesn't have this block"));
},
[&](ton_api::tonNode_outMsgQueueProof &x) {
delay_action(
[=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable {
promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x));
},
td::Timestamp::now());
}));
});
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
"get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_);
}

void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
CHECK(!handle_);
handle_ = std::move(handle);
Expand Down
3 changes: 3 additions & 0 deletions validator/full-node-shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class FullNodeShard : public td::actor::Actor {
td::Promise<std::vector<BlockIdExt>> promise) = 0;
virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) = 0;
virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;

virtual void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0;

Expand Down
7 changes: 5 additions & 2 deletions validator/full-node-shard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query,
td::Promise<td::BufferSlice> promise);
// void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareNextKeyBlockProof &query,
// td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
td::Promise<td::BufferSlice> promise);
void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query, td::Promise<td::BufferSlice> promise);
void receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data);

Expand Down Expand Up @@ -183,6 +183,9 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<std::vector<BlockIdExt>> promise) override;
void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override;
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override;

void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;

Expand Down
30 changes: 30 additions & 0 deletions validator/full-node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "td/actor/MultiPromise.h"
#include "full-node.h"
#include "common/delay.h"
#include "impl/out-msg-queue-proof.hpp"
#include "td/utils/Random.h"
#include "ton/ton-tl.hpp"

Expand Down Expand Up @@ -430,6 +431,24 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFull sh
timeout, std::move(promise));
}

void FullNodeImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
if (blocks.empty()) {
promise.set_value({});
return;
}
// All blocks are expected to have the same minsplit shard prefix
auto shard = get_shard(blocks[0].shard_full());
if (shard.empty()) {
VLOG(FULL_NODE_WARNING) << "dropping download msg queue query to unknown shard";
promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready"));
return;
}
td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
timeout, std::move(promise));
}

td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) {
if (shard.is_masterchain()) {
return shards_[ShardIdFull{masterchainId}].actor.get();
Expand Down Expand Up @@ -557,6 +576,11 @@ void FullNodeImpl::process_block_candidate_broadcast(BlockIdExt block_id, Catcha
std::move(data));
}

void FullNodeImpl::get_out_msg_queue_query_token(td::Promise<std::unique_ptr<ActionToken>> promise) {
td::actor::send_closure(out_msg_queue_query_token_manager_, &TokenManager::get_token, 1, 0, td::Timestamp::in(10.0),
std::move(promise));
}

void FullNodeImpl::set_validator_telemetry_filename(std::string value) {
validator_telemetry_filename_ = std::move(value);
update_validator_telemetry_collector();
Expand Down Expand Up @@ -645,6 +669,12 @@ void FullNodeImpl::start_up() {
td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, shard_prefix, std::move(tmp_dir),
timeout, std::move(promise));
}
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override {
td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
timeout, std::move(promise));
}

void new_key_block(BlockHandle handle) override {
td::actor::send_closure(id_, &FullNodeImpl::new_key_block, std::move(handle));
Expand Down
1 change: 1 addition & 0 deletions validator/full-node.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class FullNode : public td::actor::Actor {
virtual void process_block_broadcast(BlockBroadcast broadcast) = 0;
virtual void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::BufferSlice data) = 0;
virtual void get_out_msg_queue_query_token(td::Promise<std::unique_ptr<ActionToken>> promise) = 0;

virtual void set_validator_telemetry_filename(std::string value) = 0;

Expand Down
8 changes: 8 additions & 0 deletions validator/full-node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <map>
#include <set>
#include <queue>
#include <token-manager.h>

namespace ton {

Expand Down Expand Up @@ -79,6 +80,9 @@ class FullNodeImpl : public FullNode {
void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise<std::vector<BlockIdExt>> promise);
void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise);
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise);

void got_key_block_config(td::Ref<ConfigHolder> config);
void new_key_block(BlockHandle handle);
Expand All @@ -87,6 +91,7 @@ class FullNodeImpl : public FullNode {
void process_block_broadcast(BlockBroadcast broadcast) override;
void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) override;
void get_out_msg_queue_query_token(td::Promise<std::unique_ptr<ActionToken>> promise) override;

void set_validator_telemetry_filename(std::string value) override;

Expand Down Expand Up @@ -160,6 +165,9 @@ class FullNodeImpl : public FullNode {
PublicKeyHash validator_telemetry_collector_key_ = PublicKeyHash::zero();

void update_validator_telemetry_collector();

td::actor::ActorOwn<TokenManager> out_msg_queue_query_token_manager_ =
td::actor::create_actor<TokenManager>("tokens", /* max_tokens = */ 1);
};

} // namespace fullnode
Expand Down
Loading
Loading