Skip to content

Commit

Permalink
fix(db): Fix root cause of RocksDB misbehavior (#301)
Browse files Browse the repository at this point in the history
# What ❔

Fixes (hopefully) the root cause of recent RocksDB misbehavior. After
recent refactoring, it improperly cancels background compactions /
flushes each time a `RocksDB` instance is dropped, which leads to
compactions / flushes being constantly interrupted. (After refactoring,
`RocksDB` instances are `Clone`eable and act essentially as `Arc`
wrappers; a new instance is created and dropped, in particular, [when
processing](https://github.com/matter-labs/zksync-era/blob/ec41692063909742005acd656694a8f0838fcf33/core/lib/zksync_core/src/metadata_calculator/updater.rs#L385)
each chunk of L1 batches.) A proper solution, implemented by this PR, is
to cancel background work when dropping an internal, non-`Arc`'d DB
instance.

## Why ❔

That's a bug that makes RocksDB behave *very* unstably, as witnessed by
recent issues with it.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Code has been formatted via `zk fmt` and `zk lint`.

---------

Co-authored-by: Igor Borodin <[email protected]>
  • Loading branch information
slowli and hatemosphere authored Oct 26, 2023
1 parent c1f2f5e commit d6c30ab
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
30 changes: 21 additions & 9 deletions core/lib/storage/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ impl RocksDBInner {
}
}

impl Drop for RocksDBInner {
fn drop(&mut self) {
tracing::debug!(
"Canceling background compactions / flushes for DB `{}`",
self.db_name
);
self.db.cancel_all_background_work(true);
}
}

/// Configuration for retries when RocksDB writes are stalled.
#[derive(Debug, Clone, Copy)]
pub struct StalledWritesRetries {
Expand Down Expand Up @@ -448,13 +458,21 @@ impl<CF: NamedColumnFamily> RocksDB<CF> {

let raw_batch_bytes = raw_batch.data().to_vec();
let mut retries = self.stalled_writes_retries.intervals();
let mut stalled_write_reported = false;
let started_at = Instant::now();
loop {
match self.write_inner(raw_batch) {
Ok(()) => return Ok(()),
Ok(()) => {
if stalled_write_reported {
METRICS.observe_stalled_write_duration(CF::DB_NAME, started_at.elapsed());
}
return Ok(());
}
Err(err) => {
let is_stalled_write = StalledWritesRetries::is_write_stall_error(&err);
if is_stalled_write {
METRICS.report_stalled_write(CF::DB_NAME);
if is_stalled_write && !stalled_write_reported {
METRICS.observe_stalled_write(CF::DB_NAME);
stalled_write_reported = true;
} else {
return Err(err);
}
Expand Down Expand Up @@ -552,12 +570,6 @@ impl RocksDB<()> {
}
}

impl<CF> Drop for RocksDB<CF> {
fn drop(&mut self) {
self.inner.db.cancel_all_background_work(true);
}
}

/// Empty struct used to register rocksdb instance
#[derive(Debug)]
struct RegistryEntry;
Expand Down
20 changes: 18 additions & 2 deletions core/lib/storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vise::{Buckets, Collector, Counter, EncodeLabelSet, Family, Gauge, Histogram
use std::{
collections::HashMap,
sync::{Mutex, Weak},
time::Duration,
};

use crate::db::RocksDBInner;
Expand Down Expand Up @@ -41,18 +42,33 @@ pub(crate) struct RocksdbMetrics {
/// Size of a serialized `WriteBatch` written to a RocksDB instance.
#[metrics(buckets = BYTE_SIZE_BUCKETS)]
write_batch_size: Family<DbLabel, Histogram<usize>>,
/// Number of stalled writes for a RocksDB instance.
/// Number of independent stalled writes for a RocksDB instance.
// The counter is similar for the counter in `stalled_write_duration` histogram, but is reported earlier
// (immediately when stalled write is encountered, rather than when it's resolved).
write_stalled: Family<DbLabel, Counter>,
/// Total duration of a stalled writes instance for a RocksDB instance. Naturally, this only reports
/// stalled writes that were resolved in time (otherwise, the stall error is propagated, which
/// leads to a panic).
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
stalled_write_duration: Family<DbLabel, Histogram<Duration>>,
}

impl RocksdbMetrics {
pub(crate) fn report_batch_size(&self, db: &'static str, batch_size: usize) {
self.write_batch_size[&db.into()].observe(batch_size);
}

pub(crate) fn report_stalled_write(&self, db: &'static str) {
pub(crate) fn observe_stalled_write(&self, db: &'static str) {
self.write_stalled[&db.into()].inc();
}

pub(crate) fn observe_stalled_write_duration(
&self,
db: &'static str,
stall_duration: Duration,
) {
self.stalled_write_duration[&db.into()].observe(stall_duration);
}
}

#[vise::register]
Expand Down

0 comments on commit d6c30ab

Please sign in to comment.