Skip to content

Commit

Permalink
Improve block broadcasts processing; add special overlay for blocks f…
Browse files Browse the repository at this point in the history
…or validators (#885)

* Improve block broadcast processing

* ValidatorManagerImpl::written_handle
* Retry sending broadcasts in ValidatorGroup
* Fix setting channel_ready in AdnlPeerPair

* Add special overlay for validators for block broadcasting (#842)

* Private overlay for broadcasting blocks

---------

Co-authored-by: SpyCheese <[email protected]>

(cherry picked from commit a52045b)

---------

Co-authored-by: SpyCheese <[email protected]>
  • Loading branch information
EmelyanenkoK and SpyCheese authored Feb 1, 2024
1 parent a11ffb1 commit 59927ba
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 30 deletions.
4 changes: 4 additions & 0 deletions adnl/adnl-peer-table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class AdnlPeerTableImpl : public AdnlPeerTable {
td::actor::ActorId<AdnlChannel> channel) override;
void unregister_channel(AdnlChannelIdShort id) override;

void check_id_exists(AdnlNodeIdShort id, td::Promise<bool> promise) override {
promise.set_value(local_ids_.count(id));
}

void write_new_addr_list_to_db(AdnlNodeIdShort local_id, AdnlNodeIdShort peer_id, AdnlDbItem node,
td::Promise<td::Unit> promise) override;
void get_addr_list_from_db(AdnlNodeIdShort local_id, AdnlNodeIdShort peer_id,
Expand Down
4 changes: 3 additions & 1 deletion adnl/adnl-peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ void AdnlPeerPairImpl::receive_packet_from_channel(AdnlChannelIdShort id, AdnlPa
VLOG(ADNL_NOTICE) << this << ": dropping IN message: outdated channel id" << id;
return;
}
channel_ready_ = true;
if (channel_inited_) {
channel_ready_ = true;
}
receive_packet_checked(std::move(packet));
}

Expand Down
2 changes: 2 additions & 0 deletions adnl/adnl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class Adnl : public AdnlSenderInterface {
virtual void add_id_ex(AdnlNodeIdFull id, AdnlAddressList addr_list, td::uint8 cat, td::uint32 mode) = 0;
virtual void del_id(AdnlNodeIdShort id, td::Promise<td::Unit> promise) = 0;

virtual void check_id_exists(AdnlNodeIdShort id, td::Promise<bool> promise) = 0;

// subscribe to (some) messages(+queries) to this local id
virtual void subscribe(AdnlNodeIdShort dst, std::string prefix, std::unique_ptr<Callback> callback) = 0;
virtual void unsubscribe(AdnlNodeIdShort dst, std::string prefix) = 0;
Expand Down
2 changes: 2 additions & 0 deletions tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ tonNode.newShardBlockBroadcast block:tonNode.newShardBlock = tonNode.Broadcast;

tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId;

tonNode.privateBlockOverlayId zero_state_file_hash:int256 nodes:(vector int256) = tonNode.PrivateBlockOverlayId;

tonNode.keyBlocks blocks:(vector tonNode.blockIdExt) incomplete:Bool error:Bool = tonNode.KeyBlocks;

ton.blockId root_cell_hash:int256 file_hash:int256 = ton.BlockId;
Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
8 changes: 8 additions & 0 deletions ton/ton-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,14 @@ struct BlockBroadcast {
td::uint32 validator_set_hash;
td::BufferSlice data;
td::BufferSlice proof;

BlockBroadcast clone() const {
std::vector<BlockSignature> new_signatures;
for (const BlockSignature& s : signatures) {
new_signatures.emplace_back(s.node, s.signature.clone());
}
return {block_id, std::move(new_signatures), catchain_seqno, validator_set_hash, data.clone(), proof.clone()};
}
};

struct Ed25519_PrivateKey {
Expand Down
4 changes: 3 additions & 1 deletion validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ set(FULL_NODE_SOURCE
full-node-master.h
full-node-master.hpp
full-node-master.cpp

full-node-private-overlay.hpp
full-node-private-overlay.cpp

net/download-block.hpp
net/download-block.cpp
net/download-block-new.hpp
Expand Down
22 changes: 16 additions & 6 deletions validator/downloaders/wait-block-state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ void WaitBlockState::start() {
} else if (!handle_->inited_prev() || (!handle_->inited_proof() && !handle_->inited_proof_link())) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle = handle_](td::Result<td::BufferSlice> R) {
if (R.is_error()) {
delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::start); }, td::Timestamp::in(0.1));
delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link); },
td::Timestamp::in(0.1));
} else {
td::actor::send_closure(SelfId, &WaitBlockState::got_proof_link, R.move_as_ok());
}
});

waiting_proof_link_ = true;
td::actor::send_closure(manager_, &ValidatorManager::send_get_block_proof_link_request, handle_->id(), priority_,
std::move(P));
} else if (prev_state_.is_null()) {
Expand All @@ -133,12 +135,14 @@ void WaitBlockState::start() {
} else if (handle_->id().is_masterchain() && !handle_->inited_proof()) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle = handle_](td::Result<td::BufferSlice> R) {
if (R.is_error()) {
delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::start); }, td::Timestamp::in(0.1));
delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof); },
td::Timestamp::in(0.1));
} else {
td::actor::send_closure(SelfId, &WaitBlockState::got_proof, R.move_as_ok());
}
});

waiting_proof_ = true;
td::actor::send_closure(manager_, &ValidatorManager::send_get_block_proof_request, handle_->id(), priority_,
std::move(P));
} else if (block_.is_null()) {
Expand Down Expand Up @@ -172,6 +176,9 @@ void WaitBlockState::got_prev_state(td::Ref<ShardState> state) {
}

void WaitBlockState::got_proof_link(td::BufferSlice data) {
if (!waiting_proof_link_) {
return;
}
auto R = create_proof_link(handle_->id(), std::move(data));
if (R.is_error()) {
LOG(INFO) << "received bad proof link: " << R.move_as_error();
Expand All @@ -182,22 +189,25 @@ void WaitBlockState::got_proof_link(td::BufferSlice data) {
if (R.is_ok()) {
auto h = R.move_as_ok();
CHECK(h->inited_prev());
td::actor::send_closure(SelfId, &WaitBlockState::start);
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link);
} else {
LOG(INFO) << "received bad proof link: " << R.move_as_error();
td::actor::send_closure(SelfId, &WaitBlockState::start);
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link);
}
});
run_check_proof_link_query(handle_->id(), R.move_as_ok(), manager_, timeout_, std::move(P));
}

void WaitBlockState::got_proof(td::BufferSlice data) {
if (!waiting_proof_) {
return;
}
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
if (R.is_ok()) {
td::actor::send_closure(SelfId, &WaitBlockState::start);
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof);
} else {
LOG(INFO) << "received bad proof link: " << R.move_as_error();
td::actor::send_closure(SelfId, &WaitBlockState::start);
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof);
}
});
td::actor::send_closure(manager_, &ValidatorManager::validate_block_proof, handle_->id(), std::move(data),
Expand Down
20 changes: 18 additions & 2 deletions validator/downloaders/wait-block-state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,9 @@ class WaitBlockState : public td::actor::Actor {
void force_read_from_db();

void start_up() override;
void got_block_handle(BlockHandle handle);
void start();
void got_state_from_db(td::Ref<ShardState> data);
void got_state_from_static_file(td::Ref<ShardState> state, td::BufferSlice data);
void failed_to_get_state_from_db(td::Status reason);
void got_prev_state(td::Ref<ShardState> state);
void failed_to_get_prev_state(td::Status reason);
void got_block_data(td::Ref<BlockData> data);
Expand All @@ -68,6 +66,22 @@ class WaitBlockState : public td::actor::Actor {
priority_ = priority;
}

// These two methods can be called from ValidatorManagerImpl::written_handle
void after_get_proof_link() {
if (!waiting_proof_link_) {
return;
}
waiting_proof_link_ = false;
start();
}
void after_get_proof() {
if (!waiting_proof_) {
return;
}
waiting_proof_ = false;
start();
}

private:
BlockHandle handle_;

Expand All @@ -81,6 +95,8 @@ class WaitBlockState : public td::actor::Actor {
td::Ref<BlockData> block_;

bool reading_from_db_ = false;
bool waiting_proof_link_ = false;
bool waiting_proof_ = false;
td::Timestamp next_static_file_attempt_;

td::PerfWarningTimer perf_timer_;
Expand Down
175 changes: 175 additions & 0 deletions validator/full-node-private-overlay.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once

#include "full-node-private-overlay.hpp"
#include "ton/ton-tl.hpp"
#include "common/delay.h"

namespace ton {

namespace validator {

namespace fullnode {

void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_blockBroadcast &query) {
std::vector<BlockSignature> signatures;
for (auto &sig : query.signatures_) {
signatures.emplace_back(BlockSignature{sig->who_, std::move(sig->signature_)});
}

BlockIdExt block_id = create_block_id(query.id_);
BlockBroadcast B{block_id,
std::move(signatures),
static_cast<UnixTime>(query.catchain_seqno_),
static_cast<td::uint32>(query.validator_set_hash_),
std::move(query.data_),
std::move(query.proof_)};

auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
LOG(DEBUG) << "dropped broadcast: " << R.move_as_error();
} else {
LOG(INFO) << "dropped broadcast: " << R.move_as_error();
}
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, std::move(B),
std::move(P));
}

void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_newShardBlockBroadcast &query) {
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block,
create_block_id(query.block_->block_), query.block_->cc_seqno_,
std::move(query.block_->data_));
}

void FullNodePrivateOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
return;
}

ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
}

void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
if (!inited_) {
return;
}
auto B = create_serialize_tl_object<ton_api::tonNode_newShardBlockBroadcast>(
create_tl_object<ton_api::tonNode_newShardBlock>(create_tl_block_id(block_id), cc_seqno, std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(B));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
}
}

void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) {
if (!inited_) {
return;
}
std::vector<tl_object_ptr<ton_api::tonNode_blockSignature>> sigs;
for (auto &sig : broadcast.signatures) {
sigs.emplace_back(create_tl_object<ton_api::tonNode_blockSignature>(sig.node, sig.signature.clone()));
}
auto B = create_serialize_tl_object<ton_api::tonNode_blockBroadcast>(
create_tl_block_id(broadcast.block_id), broadcast.catchain_seqno, broadcast.validator_set_hash, std::move(sigs),
broadcast.proof.clone(), broadcast.data.clone());
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
}

void FullNodePrivateOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());

std::vector<td::Bits256> nodes;
for (const adnl::AdnlNodeIdShort &id : nodes_) {
nodes.push_back(id.bits256_value());
}
auto X = create_hash_tl_object<ton_api::tonNode_privateBlockOverlayId>(zero_state_file_hash_, std::move(nodes));
td::BufferSlice b{32};
b.as_slice().copy_from(as_slice(X));
overlay_id_full_ = overlay::OverlayIdFull{std::move(b)};
overlay_id_ = overlay_id_full_.compute_short_id();

try_init();
}

void FullNodePrivateOverlay::try_init() {
// Sometimes adnl id is added to validator engine later (or not at all)
td::actor::send_closure(
adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result<bool> R) {
if (R.is_ok() && R.ok()) {
td::actor::send_closure(SelfId, &FullNodePrivateOverlay::init);
} else {
delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateOverlay::try_init); },
td::Timestamp::in(30.0));
}
});
}

void FullNodePrivateOverlay::init() {
LOG(FULL_NODE_INFO) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size()
<< " nodes";
class Callback : public overlay::Overlays::Callback {
public:
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
}
void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) override {
}
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
td::actor::send_closure(node_, &FullNodePrivateOverlay::receive_broadcast, src, std::move(data));
}
void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::Unit> promise) override {
}
Callback(td::actor::ActorId<FullNodePrivateOverlay> node) : node_(node) {
}

private:
td::actor::ActorId<FullNodePrivateOverlay> node_;
};

overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(),
overlay::CertificateFlags::AllowFec | overlay::CertificateFlags::Trusted,
{}};
td::actor::send_closure(overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(),
nodes_, std::make_unique<Callback>(actor_id(this)), rules);

td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_);
td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_);
inited_ = true;
}

void FullNodePrivateOverlay::tear_down() {
if (inited_) {
td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_);
}
}

} // namespace fullnode

} // namespace validator

} // namespace ton
Loading

0 comments on commit 59927ba

Please sign in to comment.