Skip to content

Commit

Permalink
fix(en): gracefully shutdown en waiting for reorg detector (#270)
Browse files Browse the repository at this point in the history
This PR changes the shutdown order of the external node to a more
graceful one.

# Why?

Currently sutting down the EN because of reorgs and inconsistency checks
may be raceful and a shutting down consistency checker may bring down
the reorg detector with it which would prevent the reorg detector from
doing it's job in the legitimate cases of reorgs where the node actually
needs to be rolled back to some previous state.

# What happens currently?

The concerns have been focused around the logic of consistency checker
that used to panic when an inconsistency was found. That is no longer
the case (`anyhow::bail!` used to be `panic`):


https://github.com/matter-labs/zksync-era/blob/feb8a6c7053cc5e0202088f6a1f7644316e1ad05/core/lib/zksync_core/src/consistency_checker/mod.rs#L181-L183

however, this error would still unconditionally bring the node down
because currently the handler for `ConsistencyChecker` is a part of
`wait_tasks` that would resolve if any of the tasks has resolved:


https://github.com/matter-labs/zksync-era/blob/feb8a6c7053cc5e0202088f6a1f7644316e1ad05/core/bin/external_node/src/main.rs#L401-L404

The proposed logic works as follows:

The handle to reorg detector is fused and on the node shutdown, after
all the components are gracefully shutdown we manually check the result
of the reorg detector. This logic handles both situations:

1. node is shutting down
2. reorg detector has found a reorg and is performing a rollback and
shutdown.
  • Loading branch information
montekki authored Oct 26, 2023
1 parent d1034b0 commit f048485
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 55 deletions.
30 changes: 17 additions & 13 deletions .github/workflows/ci-core-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,18 +265,22 @@ jobs:
- name: Run Cross EN Checker
run: ci_run zk run cross-en-checker

# - name: Run revert test
# run: |
# ci_run pkill zksync_server || true
# ci_run sleep 2
# ci_run zk test i revert
# # Check that the rollback was performed on the EN
# ci_run grep -q 'Rollback successfully completed' ext-node.log
# # Restart the EN
# ci_run zk server &>>server.log &
# ci_run sleep 30
# ZKSYNC_ENV=ext-node-docker ci_run zk external-node &>>ext-node.log &
# ci_run sleep 30
- name: Run revert test
run: |
ci_run zk env
ci_run zk env docker
ci_run pkill zksync_server || true
ci_run sleep 2
ci_run zk env
ci_run zk test i revert
# Check that the rollback was performed on the EN
ci_run sleep 20
ci_run grep -q 'Rollback successfully completed' ext-node.log
# Restart the EN
ci_run zk server &>>server.log &
ci_run sleep 30
ZKSYNC_ENV=ext-node-docker ci_run zk external-node &>>ext-node.log &
ci_run sleep 30
- name: Run upgrade test
run: |
Expand All @@ -288,7 +292,7 @@ jobs:
run: |
ci_run cat server.log
ci_run cat ext-node.log
# ci_run cat core/tests/revert-test/revert.log
ci_run cat core/tests/revert-test/revert.log
ci_run cat core/tests/upgrade-test/upgrade.log
ci_run sccache --show-stats
ci_run cat /tmp/sccache_log.txt
3 changes: 0 additions & 3 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,6 @@ pub struct OptionalENConfig {
// Other config settings
/// Port on which the Prometheus exporter server is listening.
pub prometheus_port: Option<u16>,
/// Whether to try running EN with MultiVM.
#[serde(default)]
pub experimental_multivm_support: bool,
/// Number of keys that is processed by enum_index migration in State Keeper each L1 batch.
#[serde(default = "OptionalENConfig::default_enum_index_migration_chunk_size")]
pub enum_index_migration_chunk_size: usize,
Expand Down
66 changes: 37 additions & 29 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use tokio::{sync::watch, task, time::sleep};

use std::{sync::Arc, time::Duration};

use futures::{future::FusedFuture, FutureExt};
use prometheus_exporter::PrometheusExporterConfig;
use zksync_basic_types::{Address, L2ChainId};
use zksync_core::{
Expand Down Expand Up @@ -94,6 +95,7 @@ async fn init_tasks(
Vec<task::JoinHandle<anyhow::Result<()>>>,
watch::Sender<bool>,
HealthCheckHandle,
watch::Receiver<bool>,
)> {
let main_node_url = config
.required
Expand Down Expand Up @@ -186,12 +188,7 @@ async fn init_tasks(
let tree_handle =
task::spawn(metadata_calculator.run(tree_pool, prover_tree_pool, tree_stop_receiver));

let consistency_checker_handle = if !config.optional.experimental_multivm_support {
Some(tokio::spawn(consistency_checker.run(stop_receiver.clone())))
} else {
// TODO (BFT-264): Current behavior of consistency checker makes development of MultiVM harder.
None
};
let consistency_checker_handle = tokio::spawn(consistency_checker.run(stop_receiver.clone()));

let updater_handle = task::spawn(batch_status_updater.run(stop_receiver.clone()));
let sk_handle = task::spawn(state_keeper.run());
Expand Down Expand Up @@ -288,11 +285,9 @@ async fn init_tasks(
tree_handle,
gas_adjuster_handle,
]);
if let Some(consistency_checker) = consistency_checker_handle {
task_handles.push(consistency_checker);
}
task_handles.push(consistency_checker_handle);

Ok((task_handles, stop_sender, healthcheck_handle))
Ok((task_handles, stop_sender, healthcheck_handle, stop_receiver))
}

async fn shutdown_components(
Expand Down Expand Up @@ -402,46 +397,59 @@ async fn main() -> anyhow::Result<()> {
.await
.context("Performing genesis failed")?;

let (task_handles, stop_sender, health_check_handle) =
let (task_handles, stop_sender, health_check_handle, stop_receiver) =
init_tasks(config.clone(), connection_pool.clone())
.await
.context("init_tasks")?;

let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone());
let reorg_detector_handle = tokio::spawn(reorg_detector.run());
let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone(), stop_receiver);
let mut reorg_detector_handle = tokio::spawn(reorg_detector.run()).fuse();

let particular_crypto_alerts = None;
let graceful_shutdown = None::<futures::future::Ready<()>>;
let tasks_allowed_to_finish = false;
let mut reorg_detector_last_correct_batch = None;

tokio::select! {
_ = wait_for_tasks(task_handles, particular_crypto_alerts, graceful_shutdown, tasks_allowed_to_finish) => {},
_ = sigint_receiver => {
tracing::info!("Stop signal received, shutting down");
},
last_correct_batch = reorg_detector_handle => {
last_correct_batch = &mut reorg_detector_handle => {
if let Ok(last_correct_batch) = last_correct_batch {
tracing::info!("Performing rollback to block {}", last_correct_batch);
shutdown_components(stop_sender, health_check_handle).await;
let reverter = BlockReverter::new(
config.required.state_cache_path,
config.required.merkle_tree_path,
None,
connection_pool,
L1ExecutedBatchesRevert::Allowed,
);
reverter
.rollback_db(last_correct_batch, BlockReverterFlags::all())
.await;
tracing::info!("Rollback successfully completed, the node has to restart to continue working");
return Ok(());
reorg_detector_last_correct_batch = last_correct_batch;
} else {
tracing::error!("Reorg detector actor failed");
}
}
}
};

// Reaching this point means that either some actor exited unexpectedly or we received a stop signal.
// Broadcast the stop signal to all actors and exit.
shutdown_components(stop_sender, health_check_handle).await;

if !reorg_detector_handle.is_terminated() {
if let Ok(Some(last_correct_batch)) = reorg_detector_handle.await {
reorg_detector_last_correct_batch = Some(last_correct_batch);
}
}

if let Some(last_correct_batch) = reorg_detector_last_correct_batch {
tracing::info!("Performing rollback to block {}", last_correct_batch);
let reverter = BlockReverter::new(
config.required.state_cache_path,
config.required.merkle_tree_path,
None,
connection_pool,
L1ExecutedBatchesRevert::Allowed,
);
reverter
.rollback_db(last_correct_batch, BlockReverterFlags::all())
.await;
tracing::info!(
"Rollback successfully completed, the node has to restart to continue working"
);
}

Ok(())
}
24 changes: 18 additions & 6 deletions core/lib/zksync_core/src/reorg_detector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{future::Future, time::Duration};

use tokio::sync::watch;
use zksync_dal::ConnectionPool;
use zksync_types::{L1BatchNumber, MiniblockNumber};
use zksync_web3_decl::{
Expand Down Expand Up @@ -31,14 +32,19 @@ const SLEEP_INTERVAL: Duration = Duration::from_secs(5);
pub struct ReorgDetector {
client: HttpClient,
pool: ConnectionPool,
should_stop: watch::Receiver<bool>,
}

impl ReorgDetector {
pub fn new(url: &str, pool: ConnectionPool) -> Self {
pub fn new(url: &str, pool: ConnectionPool, should_stop: watch::Receiver<bool>) -> Self {
let client = HttpClientBuilder::default()
.build(url)
.expect("Failed to create HTTP client");
Self { client, pool }
Self {
client,
pool,
should_stop,
}
}

/// Compares hashes of the given local miniblock and the same miniblock from main node.
Expand Down Expand Up @@ -118,7 +124,7 @@ impl ReorgDetector {
.map(L1BatchNumber)
}

pub async fn run(self) -> L1BatchNumber {
pub async fn run(mut self) -> Option<L1BatchNumber> {
loop {
match self.run_inner().await {
Ok(l1_batch_number) => return l1_batch_number,
Expand All @@ -134,8 +140,10 @@ impl ReorgDetector {
}
}

async fn run_inner(&self) -> RpcResult<L1BatchNumber> {
async fn run_inner(&mut self) -> RpcResult<Option<L1BatchNumber>> {
loop {
let should_stop = *self.should_stop.borrow();

let sealed_l1_batch_number = self
.pool
.access_storage()
Expand Down Expand Up @@ -176,7 +184,6 @@ impl ReorgDetector {
.set(sealed_l1_batch_number.0.into());
EN_METRICS.last_correct_miniblock[&CheckerComponent::ReorgDetector]
.set(sealed_miniblock_number.0.into());
tokio::time::sleep(SLEEP_INTERVAL).await;
} else {
if !root_hashes_match {
tracing::warn!(
Expand All @@ -195,8 +202,13 @@ impl ReorgDetector {
tracing::info!(
"Reorg localized: last correct L1 batch is #{last_correct_l1_batch}"
);
return Ok(last_correct_l1_batch);
return Ok(Some(last_correct_l1_batch));
}
if should_stop {
tracing::info!("Shutting down reorg detector");
return Ok(None);
}
tokio::time::sleep(SLEEP_INTERVAL).await;
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions etc/env/ext-node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ api_namespaces = ["eth", "web3", "net", "pubsub", "zks", "en", "debug"]
bootloader_hash="0x0100038581be3d0e201b3cc45d151ef5cc59eb3a0f146ad44f0f72abf00b594c"
default_aa_hash="0x0100038dc66b69be75ec31653c64cb931678299b9b659472772b2550b703f41c"

# Whether to try using MultiVM
# WARN: Only to be used locally until the feature is fully tested.
experimental_multivm_support = false

[rust]
# `RUST_LOG` environmnet variable for `env_logger`
# Here we use TOML multiline strings: newlines will be trimmed.
Expand Down

0 comments on commit f048485

Please sign in to comment.