Skip to content

Commit

Permalink
fix(validator): session signatures storage #274
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Nov 11, 2024
1 parent 57e6f23 commit 161a241
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 53 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ rlimit = "0.10.1"
rustc_version = "0.4"
rustls = "0.23.16"
rustls-webpki = "0.102"
scc = "2.1"
scopeguard = "1.2"
serde = "1.0"
serde_json = "1.0.114"
Expand Down Expand Up @@ -235,4 +234,4 @@ opt-level = 3
[profile.dev.package.hashbrown]
opt-level = 3
[profile.dev.package."*"]
opt-level = 1
opt-level = 1
4 changes: 3 additions & 1 deletion collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ metrics = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
scc = { workspace = true }
scopeguard = { workspace = true }
serde = { workspace = true }
sha2 = { workspace = true }
Expand Down Expand Up @@ -65,3 +64,6 @@ block-creator-stats = []

[lints]
workspace = true

[profile.test]
incremental = true
85 changes: 36 additions & 49 deletions collator/src/validator/impls/std_impl/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use everscale_crypto::ed25519::KeyPair;
use everscale_types::models::*;
use futures_util::stream::FuturesUnordered;
use futures_util::{Future, StreamExt};
use scc::TreeIndex;
use tokio::sync::{Notify, Semaphore};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tycho_network::{OverlayId, PeerId, PrivateOverlay, Request};
use tycho_util::futures::JoinTask;
use tycho_util::metrics::HistogramGuard;
use tycho_util::FastHashMap;
use tycho_util::{FastDashMap, FastHashMap};

use super::ValidatorStdImplConfig;
use crate::tracing_targets;
Expand Down Expand Up @@ -87,8 +86,8 @@ impl ValidatorSession {
shard_ident: info.shard_ident,
weight_threshold,
validators: Arc::new(validators),
block_signatures: TreeIndex::new(),
cached_signatures: TreeIndex::new(),
block_signatures: FastDashMap::default(),
cached_signatures: FastDashMap::default(),
cancelled: AtomicBool::new(false),
cancelled_signal: Notify::new(),
});
Expand Down Expand Up @@ -153,17 +152,17 @@ impl ValidatorSession {
.fetch_max(block_seqno, Ordering::Release);

let state = self.inner.state.as_ref();
state.cached_signatures.remove_range(..=block_seqno);

let guard = scc::ebr::Guard::new();
for (_, validation) in state.block_signatures.range(..=block_seqno, &guard) {
validation.cancelled.cancel();
}
drop(guard);
state.cached_signatures.retain(|&key, _| key > block_seqno);

// NOTE: Remove only blocks that are old enough.
let until_seqno = block_seqno.saturating_sub(self.inner.config.old_blocks_to_keep);
state.block_signatures.remove_range(..=until_seqno);

state.block_signatures.retain(|&key, validation| {
if key <= block_seqno {
validation.cancelled.cancel();
}
key > until_seqno
});
}

#[tracing::instrument(
Expand All @@ -184,41 +183,36 @@ impl ValidatorSession {

let state = &self.inner.state;

// Remove cached slot
let entry = state.block_signatures.entry(block_id.seqno);
if let tycho_util::DashMapEntry::Occupied(_) = entry {
anyhow::bail!(
"block validation is already in progress. \
session_id={}, block_id={:?}",
self.inner.session_id,
block_id
);
}

let cached = state
.cached_signatures
.peek(&block_id.seqno, &scc::ebr::Guard::new())
.map(Arc::clone);
.remove(&block_id.seqno)
.map(|(_, value)| value);

// Prepare block signatures
let block_signatures = match &cached {
Some(cached) => self.reuse_signatures(block_id, cached.clone()).await,
None => self.prepare_new_signatures(block_id),
}
.build(block_id, state.weight_threshold);
let block_signatures = {
match &cached {
Some(cached) => self.reuse_signatures(block_id, cached.clone()).await,
None => self.prepare_new_signatures(block_id),
}
.build(block_id, state.weight_threshold)
};

// Allow only one validation at a time
if state
.block_signatures
.insert(block_id.seqno, block_signatures.clone())
.is_err()
{
// TODO: Panic here?
anyhow::bail!(
"block validation is already in progress. \
session_id={}, block_id={block_id}",
self.inner.session_id
);
}
entry.or_insert(block_signatures.clone());

// NOTE: To eliminate the gap inside exchange routine, we can remove cached signatures
// only after we have inserted the block.
//
// At this point the following is true:
// - All new signatures will be stored (and validated) in the block;
// - There might be some new signatures that were stored in the cache, but we
// have not yet processed them. We will use them later.
state.cached_signatures.remove(&block_id.seqno);

// Start collecting signatures from other validators
let mut result = FastHashMap::default();
Expand Down Expand Up @@ -604,8 +598,8 @@ struct SessionState {
shard_ident: ShardIdent,
weight_threshold: u64,
validators: Arc<FastHashMap<PeerId, BriefValidatorDescr>>,
block_signatures: TreeIndex<u32, Arc<BlockSignatures>>,
cached_signatures: TreeIndex<u32, Arc<CachedSignatures>>,
block_signatures: FastDashMap<u32, Arc<BlockSignatures>>,
cached_signatures: FastDashMap<u32, Arc<CachedSignatures>>,
cancelled: AtomicBool,
cancelled_signal: Notify,
}
Expand Down Expand Up @@ -758,14 +752,9 @@ impl ExchangeSignatures for SessionState {
if self.cancelled.load(Ordering::Acquire) {
return Err(ValidationError::Cancelled);
}

let guard = scc::ebr::Guard::new();

// Full signature exchange if we know the block.
// Otherwise, cache the signature for the block to use it later.
//
// NOTE: scc's `peek` does not lock the tree
let result = if let Some(signatures) = self.block_signatures.peek(&block_seqno, &guard) {
let result = if let Some(signatures) = self.block_signatures.get(&block_seqno) {
metrics::counter!(METRIC_BLOCK_EXCHANGES_IN_TOTAL).increment(1);

let Some(slot) = signatures.other_signatures.get(peer_id) else {
Expand All @@ -774,13 +763,13 @@ impl ExchangeSignatures for SessionState {

// If more signatures are still needed, validate and store new to the block
if !signatures.validated.load(Ordering::Acquire) {
self.add_signature(signatures, slot, peer_id, &signature)?;
self.add_signature(&signatures, slot, peer_id, &signature)?;
}

proto::Exchange::Complete(signatures.own_signature.clone())
} else {
// Find the slot for the specified block seqno.
let Some(slot) = self.cached_signatures.peek(&block_seqno, &guard) else {
let Some(slot) = self.cached_signatures.get(&block_seqno) else {
metrics::counter!(METRIC_MISS_EXCHANGES_IN_TOTAL).increment(1);
return Err(ValidationError::NoSlot);
};
Expand All @@ -796,8 +785,6 @@ impl ExchangeSignatures for SessionState {
proto::Exchange::Cached
};

drop(guard);

let action = match &result {
proto::Exchange::Complete(_) => "complete",
proto::Exchange::Cached => "cached",
Expand Down

0 comments on commit 161a241

Please sign in to comment.