Skip to content

Commit

Permalink
feat(en): Support Merkle tree recovery with pruning enabled (#3172)
Browse files Browse the repository at this point in the history
## What ❔

Generalizes Merkle tree recovery so that it supports recovery after the
node has pruned some of its data.

## Why ❔

Improves node UX; allows to recover a tree of a full / non-archive node.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev
lint`.
  • Loading branch information
slowli authored Nov 6, 2024
1 parent 8a3a82c commit 7b8640a
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 121 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/lib/dal/src/storage_logs_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ impl StorageLogsDal<'_, '_> {
FROM
storage_logs
WHERE
storage_logs.miniblock_number = $1
storage_logs.miniblock_number <= $1
AND storage_logs.hashed_key >= u.start_key
AND storage_logs.hashed_key <= u.end_key
ORDER BY
Expand Down Expand Up @@ -784,7 +784,7 @@ impl StorageLogsDal<'_, '_> {
storage_logs
INNER JOIN initial_writes ON storage_logs.hashed_key = initial_writes.hashed_key
WHERE
storage_logs.miniblock_number = $1
storage_logs.miniblock_number <= $1
AND storage_logs.hashed_key >= $2::bytea
AND storage_logs.hashed_key <= $3::bytea
ORDER BY
Expand Down
1 change: 1 addition & 0 deletions core/node/metadata_calculator/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ mod tests {
extend_db_state_from_l1_batch(
&mut storage,
snapshot_recovery.l1_batch_number + 1,
snapshot_recovery.l2_block_number + 1,
new_logs,
)
.await;
Expand Down
156 changes: 101 additions & 55 deletions core/node/metadata_calculator/src/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,14 @@ use std::{
};

use anyhow::Context as _;
use async_trait::async_trait;
use futures::future;
use tokio::sync::{watch, Mutex, Semaphore};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_health_check::HealthUpdater;
use zksync_merkle_tree::TreeEntry;
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
use zksync_types::{
snapshots::{uniform_hashed_keys_chunk, SnapshotRecoveryStatus},
L2BlockNumber, H256,
};
use zksync_types::{snapshots::uniform_hashed_keys_chunk, L1BatchNumber, L2BlockNumber, H256};

use super::{
helpers::{AsyncTree, AsyncTreeRecovery, GenericAsyncTree, MerkleTreeHealth},
Expand All @@ -54,12 +52,13 @@ mod tests;

/// Handler of recovery life cycle events. This functionality is encapsulated in a trait to be able
/// to control recovery behavior in tests.
#[async_trait]
trait HandleRecoveryEvent: fmt::Debug + Send + Sync {
fn recovery_started(&mut self, _chunk_count: u64, _recovered_chunk_count: u64) {
// Default implementation does nothing
}

fn chunk_recovered(&self) {
async fn chunk_recovered(&self) {
// Default implementation does nothing
}
}
Expand All @@ -82,6 +81,7 @@ impl<'a> RecoveryHealthUpdater<'a> {
}
}

#[async_trait]
impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> {
fn recovery_started(&mut self, chunk_count: u64, recovered_chunk_count: u64) {
self.chunk_count = chunk_count;
Expand All @@ -91,7 +91,7 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> {
.set(recovered_chunk_count);
}

fn chunk_recovered(&self) {
async fn chunk_recovered(&self) {
let recovered_chunk_count = self.recovered_chunk_count.fetch_add(1, Ordering::SeqCst) + 1;
let chunks_left = self.chunk_count.saturating_sub(recovered_chunk_count);
tracing::info!(
Expand All @@ -110,34 +110,68 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> {
}

#[derive(Debug, Clone, Copy)]
struct SnapshotParameters {
struct InitParameters {
l1_batch: L1BatchNumber,
l2_block: L2BlockNumber,
expected_root_hash: H256,
expected_root_hash: Option<H256>,
log_count: u64,
desired_chunk_size: u64,
}

impl SnapshotParameters {
impl InitParameters {
async fn new(
pool: &ConnectionPool<Core>,
recovery: &SnapshotRecoveryStatus,
config: &MetadataCalculatorRecoveryConfig,
) -> anyhow::Result<Self> {
let l2_block = recovery.l2_block_number;
let expected_root_hash = recovery.l1_batch_root_hash;

) -> anyhow::Result<Option<Self>> {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let recovery_status = storage
.snapshot_recovery_dal()
.get_applied_snapshot_status()
.await?;
let pruning_info = storage.pruning_dal().get_pruning_info().await?;

let (l1_batch, l2_block);
let mut expected_root_hash = None;
match (recovery_status, pruning_info.last_hard_pruned_l2_block) {
(Some(recovery), None) => {
tracing::warn!(
"Snapshot recovery {recovery:?} is present on the node, but pruning info is empty; assuming no pruning happened"
);
l1_batch = recovery.l1_batch_number;
l2_block = recovery.l2_block_number;
expected_root_hash = Some(recovery.l1_batch_root_hash);
}
(Some(recovery), Some(pruned_l2_block)) => {
// We have both recovery and some pruning on top of it.
l2_block = pruned_l2_block.max(recovery.l2_block_number);
l1_batch = pruning_info
.last_hard_pruned_l1_batch
.with_context(|| format!("malformed pruning info: {pruning_info:?}"))?;
if l1_batch == recovery.l1_batch_number {
expected_root_hash = Some(recovery.l1_batch_root_hash);
}
}
(None, Some(pruned_l2_block)) => {
l2_block = pruned_l2_block;
l1_batch = pruning_info
.last_hard_pruned_l1_batch
.with_context(|| format!("malformed pruning info: {pruning_info:?}"))?;
}
(None, None) => return Ok(None),
};

let log_count = storage
.storage_logs_dal()
.get_storage_logs_row_count(l2_block)
.await?;

Ok(Self {
Ok(Some(Self {
l1_batch,
l2_block,
expected_root_hash,
log_count,
desired_chunk_size: config.desired_chunk_size,
})
}))
}

fn chunk_count(&self) -> u64 {
Expand Down Expand Up @@ -168,47 +202,44 @@ impl GenericAsyncTree {
stop_receiver: &watch::Receiver<bool>,
) -> anyhow::Result<Option<AsyncTree>> {
let started_at = Instant::now();
let (tree, snapshot_recovery) = match self {
let (tree, init_params) = match self {
Self::Ready(tree) => return Ok(Some(tree)),
Self::Recovering(tree) => {
let snapshot_recovery = get_snapshot_recovery(main_pool).await?.context(
let params = InitParameters::new(main_pool, config).await?.context(
"Merkle tree is recovering, but Postgres doesn't contain snapshot recovery information",
)?;
let recovered_version = tree.recovered_version();
anyhow::ensure!(
u64::from(snapshot_recovery.l1_batch_number.0) == recovered_version,
"Snapshot L1 batch in Postgres ({snapshot_recovery:?}) differs from the recovered Merkle tree version \
u64::from(params.l1_batch.0) == recovered_version,
"Snapshot L1 batch in Postgres ({params:?}) differs from the recovered Merkle tree version \
({recovered_version})"
);
tracing::info!("Resuming tree recovery with status: {snapshot_recovery:?}");
(tree, snapshot_recovery)
tracing::info!("Resuming tree recovery with status: {params:?}");
(tree, params)
}
Self::Empty { db, mode } => {
if let Some(snapshot_recovery) = get_snapshot_recovery(main_pool).await? {
tracing::info!(
"Starting Merkle tree recovery with status {snapshot_recovery:?}"
);
let l1_batch = snapshot_recovery.l1_batch_number;
if let Some(params) = InitParameters::new(main_pool, config).await? {
tracing::info!("Starting Merkle tree recovery with status {params:?}");
let l1_batch = params.l1_batch;
let tree = AsyncTreeRecovery::new(db, l1_batch.0.into(), mode, config)?;
(tree, snapshot_recovery)
(tree, params)
} else {
// Start the tree from scratch. The genesis block will be filled in `TreeUpdater::loop_updating_tree()`.
return Ok(Some(AsyncTree::new(db, mode)?));
}
}
};

let snapshot = SnapshotParameters::new(main_pool, &snapshot_recovery, config).await?;
tracing::debug!(
"Obtained snapshot parameters: {snapshot:?} based on recovery configuration {config:?}"
"Obtained recovery init parameters: {init_params:?} based on recovery configuration {config:?}"
);
let recovery_options = RecoveryOptions {
chunk_count: snapshot.chunk_count(),
chunk_count: init_params.chunk_count(),
concurrency_limit: recovery_pool.max_size() as usize,
events: Box::new(RecoveryHealthUpdater::new(health_updater)),
};
let tree = tree
.recover(snapshot, recovery_options, &recovery_pool, stop_receiver)
.recover(init_params, recovery_options, &recovery_pool, stop_receiver)
.await?;
if tree.is_some() {
// Only report latency if recovery wasn't canceled
Expand All @@ -223,12 +254,12 @@ impl GenericAsyncTree {
impl AsyncTreeRecovery {
async fn recover(
mut self,
snapshot: SnapshotParameters,
init_params: InitParameters,
mut options: RecoveryOptions<'_>,
pool: &ConnectionPool<Core>,
stop_receiver: &watch::Receiver<bool>,
) -> anyhow::Result<Option<AsyncTree>> {
self.ensure_desired_chunk_size(snapshot.desired_chunk_size)
self.ensure_desired_chunk_size(init_params.desired_chunk_size)
.await?;

let start_time = Instant::now();
Expand All @@ -237,13 +268,15 @@ impl AsyncTreeRecovery {
.map(|chunk_id| uniform_hashed_keys_chunk(chunk_id, chunk_count))
.collect();
tracing::info!(
"Recovering Merkle tree from Postgres snapshot in {chunk_count} chunks with max concurrency {}",
"Recovering Merkle tree from Postgres snapshot in {chunk_count} chunks with max concurrency {}. \
Be aware that enabling node pruning during recovery will probably result in a recovery error; always disable pruning \
until recovery is complete",
options.concurrency_limit
);

let mut storage = pool.connection_tagged("metadata_calculator").await?;
let remaining_chunks = self
.filter_chunks(&mut storage, snapshot.l2_block, &chunks)
.filter_chunks(&mut storage, init_params.l2_block, &chunks)
.await?;
drop(storage);
options
Expand All @@ -261,9 +294,10 @@ impl AsyncTreeRecovery {
.acquire()
.await
.context("semaphore is never closed")?;
if Self::recover_key_chunk(&tree, snapshot.l2_block, chunk, pool, stop_receiver).await?
if Self::recover_key_chunk(&tree, init_params.l2_block, chunk, pool, stop_receiver)
.await?
{
options.events.chunk_recovered();
options.events.chunk_recovered().await;
}
anyhow::Ok(())
});
Expand All @@ -279,13 +313,18 @@ impl AsyncTreeRecovery {

let finalize_latency = RECOVERY_METRICS.latency[&RecoveryStage::Finalize].start();
let actual_root_hash = tree.root_hash().await;
anyhow::ensure!(
actual_root_hash == snapshot.expected_root_hash,
"Root hash of recovered tree {actual_root_hash:?} differs from expected root hash {:?}. \
If pruning is enabled and the tree is initialized some time after node recovery, \
this is caused by snapshot storage logs getting pruned; this setup is currently not supported",
snapshot.expected_root_hash
);
if let Some(expected_root_hash) = init_params.expected_root_hash {
anyhow::ensure!(
actual_root_hash == expected_root_hash,
"Root hash of recovered tree {actual_root_hash:?} differs from expected root hash {expected_root_hash:?}"
);
}

// Check pruning info one last time before finalizing the tree.
let mut storage = pool.connection_tagged("metadata_calculator").await?;
Self::check_pruning_info(&mut storage, init_params.l2_block).await?;
drop(storage);

let tree = tree.finalize().await?;
finalize_latency.observe();
tracing::info!(
Expand Down Expand Up @@ -340,6 +379,21 @@ impl AsyncTreeRecovery {
Ok(output)
}

async fn check_pruning_info(
storage: &mut Connection<'_, Core>,
snapshot_l2_block: L2BlockNumber,
) -> anyhow::Result<()> {
let pruning_info = storage.pruning_dal().get_pruning_info().await?;
if let Some(last_hard_pruned_l2_block) = pruning_info.last_hard_pruned_l2_block {
anyhow::ensure!(
last_hard_pruned_l2_block == snapshot_l2_block,
"Additional data was pruned compared to tree recovery L2 block #{snapshot_l2_block}: {pruning_info:?}. \
Continuing recovery is impossible; to recover the tree, drop its RocksDB directory, stop pruning and restart recovery"
);
}
Ok(())
}

/// Returns `Ok(true)` if the chunk was recovered, `Ok(false)` if the recovery process was interrupted.
async fn recover_key_chunk(
tree: &Mutex<AsyncTreeRecovery>,
Expand All @@ -363,7 +417,9 @@ impl AsyncTreeRecovery {
.storage_logs_dal()
.get_tree_entries_for_l2_block(snapshot_l2_block, key_chunk.clone())
.await?;
Self::check_pruning_info(&mut storage, snapshot_l2_block).await?;
drop(storage);

let entries_latency = entries_latency.observe();
tracing::debug!(
"Loaded {} entries for chunk {key_chunk:?} in {entries_latency:?}",
Expand Down Expand Up @@ -414,13 +470,3 @@ impl AsyncTreeRecovery {
Ok(true)
}
}

async fn get_snapshot_recovery(
pool: &ConnectionPool<Core>,
) -> anyhow::Result<Option<SnapshotRecoveryStatus>> {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
Ok(storage
.snapshot_recovery_dal()
.get_applied_snapshot_status()
.await?)
}
Loading

0 comments on commit 7b8640a

Please sign in to comment.