Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage)!: reduce remove_blocks IO #472

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions storage/src/db/kv_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ weedb::tables! {
pub archive_block_ids: tables::ArchiveBlockIds,
pub block_handles: tables::BlockHandles,
pub key_blocks: tables::KeyBlocks,
pub full_block_ids: tables::FullBlockIds,
pub package_entries: tables::PackageEntries,
pub block_data_entries: tables::BlockDataEntries,
pub shard_states: tables::ShardStates,
Expand Down
13 changes: 13 additions & 0 deletions storage/src/db/kv_db/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ impl ColumnFamily for KeyBlocks {

impl ColumnFamilyOptions<Caches> for KeyBlocks {}

/// Maps block id (partial) to file hash
pub struct FullBlockIds;

impl ColumnFamily for FullBlockIds {
const NAME: &'static str = "full_block_ids";

fn read_options(opts: &mut rocksdb::ReadOptions) {
opts.set_verify_checksums(false);
}
}

impl ColumnFamilyOptions<Caches> for FullBlockIds {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set caches via default_block_based_table_factory


/// Maps package entry id to entry data
/// - Key: `BlockIdShort (16 bytes), [u8; 32], package type (1 byte)` <=> (`PackageEntryKey`)
/// - Value: `Vec<u8>` (block/proof/queue diff data)
Expand Down
47 changes: 24 additions & 23 deletions storage/src/store/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl BlockStorage {
if !handle.has_data() {
self.add_data_ext(&archive_id, data)?;
if handle.meta().add_flags(BlockFlags::HAS_DATA) {
self.block_handle_storage.store_handle(&handle);
self.block_handle_storage.store_handle(&handle, false);
updated = true;
}
}
Expand Down Expand Up @@ -532,7 +532,7 @@ impl BlockStorage {
if !handle.has_proof() {
self.add_data(&archive_id, data)?;
if handle.meta().add_flags(BlockFlags::HAS_PROOF) {
self.block_handle_storage.store_handle(&handle);
self.block_handle_storage.store_handle(&handle, false);
updated = true;
}
}
Expand Down Expand Up @@ -599,7 +599,7 @@ impl BlockStorage {
if !handle.has_queue_diff() {
self.add_data(&archive_id, data)?;
if handle.meta().add_flags(BlockFlags::HAS_QUEUE_DIFF) {
self.block_handle_storage.store_handle(&handle);
self.block_handle_storage.store_handle(&handle, false);
updated = true;
}
}
Expand Down Expand Up @@ -838,8 +838,8 @@ impl BlockStorage {
let db = self.db.clone();

let BlockGcStats {
mc_entries_removed,
total_entries_removed,
mc_blocks_removed,
total_blocks_removed,
} = rayon_run(move || {
let _span = span.enter();

Expand All @@ -862,8 +862,8 @@ impl BlockStorage {

tracing::info!(
total_cached_handles_removed,
mc_entries_removed,
total_entries_removed,
mc_blocks_removed,
total_blocks_removed,
"finished blocks GC"
);
Ok(())
Expand Down Expand Up @@ -1364,6 +1364,7 @@ fn remove_blocks(
let mut stats = BlockGcStats::default();

let raw = db.rocksdb().as_ref();
let full_block_ids_cf = db.full_block_ids.cf();
let block_connections_cf = db.block_connections.cf();
let package_entries_cf = db.package_entries.cf();
let block_data_entries_cf = db.block_data_entries.cf();
Expand All @@ -1373,13 +1374,12 @@ fn remove_blocks(
let mut batch = rocksdb::WriteBatch::default();
let mut batch_len = 0;

let package_entries_readopts = db.package_entries.new_read_config();
let block_handles_readopts = db.block_handles.new_read_config();

// Iterate all entries and find expired items
let mut blocks_iter = raw.raw_iterator_cf_opt(&package_entries_cf, package_entries_readopts);
let mut blocks_iter =
raw.raw_iterator_cf_opt(&full_block_ids_cf, db.full_block_ids.new_read_config());
blocks_iter.seek_to_first();

let block_handles_readopts = db.block_handles.new_read_config();
let is_persistent = |root_hash: &[u8; 32]| -> Result<bool> {
const FLAGS: u64 =
((BlockFlags::IS_KEY_BLOCK.bits() | BlockFlags::IS_PERSISTENT.bits()) as u64) << 32;
Expand Down Expand Up @@ -1412,6 +1412,7 @@ fn remove_blocks(
//
// It will delete all entries in range [from_seqno, to_seqno) for this shard.
// Note that package entry keys are the same as block connection keys.
batch.delete_range_cf(&full_block_ids_cf, &*range_from, &range_to);
batch.delete_range_cf(&package_entries_cf, &*range_from, &range_to);
batch.delete_range_cf(&block_data_entries_cf, &*range_from, &range_to);
batch.delete_range_cf(&block_connections_cf, &*range_from, &range_to);
Expand Down Expand Up @@ -1473,9 +1474,9 @@ fn remove_blocks(
}

// Count entry
stats.total_entries_removed += 1;
stats.total_blocks_removed += 1;
if is_masterchain {
stats.mc_entries_removed += 1;
stats.mc_blocks_removed += 1;
}

batch.delete_cf(&block_handles_cf, root_hash);
Expand All @@ -1486,7 +1487,7 @@ fn remove_blocks(
Some(max_blocks_per_batch) if batch_len >= max_blocks_per_batch
) {
tracing::info!(
total_package_entries_removed = stats.total_entries_removed,
total_blocks_removed = stats.total_blocks_removed,
"applying intermediate batch",
);
let batch = std::mem::take(&mut batch);
Expand All @@ -1513,8 +1514,8 @@ fn remove_blocks(

#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
pub struct BlockGcStats {
pub mc_entries_removed: usize,
pub total_entries_removed: usize,
pub mc_blocks_removed: usize,
pub total_blocks_removed: usize,
}

struct FullBlockDataGuard<'a> {
Expand Down Expand Up @@ -1735,7 +1736,7 @@ mod tests {
}

handle.meta().add_flags(BlockFlags::HAS_ALL_BLOCK_PARTS);
block_handles.store_handle(&handle);
block_handles.store_handle(&handle, false);
}
}

Expand All @@ -1748,8 +1749,8 @@ mod tests {
None,
)?;
assert_eq!(stats, BlockGcStats {
mc_entries_removed: 69 * ENTRY_TYPES.len(),
total_entries_removed: (69 + 49) * ENTRY_TYPES.len(),
mc_blocks_removed: 69,
total_blocks_removed: 69 + 49,
});

let removed_ranges = HashMap::from_iter([
Expand Down Expand Up @@ -1794,8 +1795,8 @@ mod tests {
None,
)?;
assert_eq!(stats, BlockGcStats {
mc_entries_removed: ENTRY_TYPES.len(),
total_entries_removed: 2 * ENTRY_TYPES.len(),
mc_blocks_removed: 1,
total_blocks_removed: 2,
});

// Remove no blocks
Expand All @@ -1807,8 +1808,8 @@ mod tests {
None,
)?;
assert_eq!(stats, BlockGcStats {
mc_entries_removed: 0,
total_entries_removed: 0,
mc_blocks_removed: 0,
total_blocks_removed: 0,
});

Ok(())
Expand Down
36 changes: 36 additions & 0 deletions storage/src/store/block/package_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,42 @@ impl From<&BlockId> for PartialBlockId {
}
}

impl StoredValue for PartialBlockId {
const SIZE_HINT: usize = 4 + 8 + 4 + 32;

type OnStackSlice = [u8; Self::SIZE_HINT];

fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
let mut result = [0; Self::SIZE_HINT];
result[..4].copy_from_slice(&self.shard.workchain().to_be_bytes());
result[4..12].copy_from_slice(&self.shard.prefix().to_be_bytes());
result[12..16].copy_from_slice(&self.seqno.to_be_bytes());
result[16..48].copy_from_slice(self.root_hash.as_slice());

buffer.write_raw_slice(&result);
}

fn deserialize(reader: &mut &[u8]) -> Self
where
Self: Sized,
{
assert_eq!(reader.len(), Self::SIZE_HINT, "invalid partial id");

let workchain = i32::from_be_bytes(reader[..4].try_into().unwrap());
let prefix = u64::from_be_bytes(reader[4..12].try_into().unwrap());
let seqno = u32::from_be_bytes(reader[12..16].try_into().unwrap());
let root_hash = HashBytes::from_slice(&reader[16..48]);

*reader = &reader[Self::SIZE_HINT..];

Self {
shard: ShardIdent::new(workchain, prefix).expect("invalid shard ident"),
seqno,
root_hash,
}
}
}

/// Package entry id.
#[derive(Debug, Hash, Eq, PartialEq)]
pub struct PackageEntryKey {
Expand Down
22 changes: 3 additions & 19 deletions storage/src/store/block_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,14 @@ impl BlockConnectionStorage {
let block_handle_cf = &self.db.block_handles.cf();
let rocksdb = self.db.rocksdb();

if handle.is_key_block() {
let mut write_batch = weedb::rocksdb::WriteBatch::default();

write_batch.merge_cf(
block_handle_cf,
id.root_hash.as_slice(),
handle.meta().to_vec(),
);
write_batch.put_cf(
&self.db.key_blocks.cf(),
id.seqno.to_be_bytes(),
id.to_vec(),
);

rocksdb.write(write_batch)
} else {
rocksdb.merge_cf_opt(
rocksdb
.merge_cf_opt(
block_handle_cf,
id.root_hash.as_slice(),
handle.meta().to_vec(),
self.db.block_handles.write_config(),
)
}
.unwrap();
.unwrap();
}

pub fn load_connection(
Expand Down
62 changes: 44 additions & 18 deletions storage/src/store/block_handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) use self::handle::BlockDataGuard;
pub use self::handle::{BlockHandle, WeakBlockHandle};
pub use self::meta::{BlockFlags, BlockMeta, LoadedBlockMeta, NewBlockMeta};
use crate::db::*;
use crate::store::PartialBlockId;
use crate::util::*;

mod handle;
Expand All @@ -31,15 +32,15 @@ impl BlockHandleStorage {
pub fn set_block_applied(&self, handle: &BlockHandle) -> bool {
let updated = handle.meta().add_flags(BlockFlags::IS_APPLIED);
if updated {
self.store_handle(handle);
self.store_handle(handle, false);
}
updated
}

pub fn set_block_persistent(&self, handle: &BlockHandle) -> bool {
let updated = handle.meta().add_flags(BlockFlags::IS_PERSISTENT);
if updated {
self.store_handle(handle);
self.store_handle(handle, false);
}
updated
}
Expand All @@ -49,7 +50,7 @@ impl BlockHandleStorage {
.meta()
.add_flags(BlockFlags::HAS_PERSISTENT_SHARD_STATE);
if updated {
self.store_handle(handle);
self.store_handle(handle, false);
}
updated
}
Expand All @@ -59,7 +60,7 @@ impl BlockHandleStorage {
.meta()
.add_flags(BlockFlags::HAS_PERSISTENT_QUEUE_STATE);
if updated {
self.store_handle(handle);
self.store_handle(handle, false);
}
updated
}
Expand Down Expand Up @@ -100,21 +101,23 @@ impl BlockHandleStorage {
);

// Fill the cache with the new handle
match self.cache.entry(*block_id) {
let is_new = match self.cache.entry(*block_id) {
Entry::Vacant(entry) => {
entry.insert(handle.downgrade());
true
}
Entry::Occupied(mut entry) => match entry.get().upgrade() {
// Another thread has created the handle
Some(handle) => return (handle, HandleCreationStatus::Fetched),
None => {
entry.insert(handle.downgrade());
true
}
},
};

// Store the handle in the storage
self.store_handle(&handle);
self.store_handle(&handle, is_new);

// Done
(handle, HandleCreationStatus::Created)
Expand Down Expand Up @@ -142,25 +145,48 @@ impl BlockHandleStorage {
Some(self.fill_cache(block_id, meta))
}

pub fn store_handle(&self, handle: &BlockHandle) {
pub fn store_handle(&self, handle: &BlockHandle, is_new: bool) {
let id = handle.id();

self.db
.rocksdb()
.merge_cf_opt(
let is_key_block = handle.is_key_block();

if is_new || is_key_block {
let mut batch = weedb::rocksdb::WriteBatch::default();

batch.merge_cf(
&self.db.block_handles.cf(),
id.root_hash.as_slice(),
id.root_hash,
handle.meta().to_vec(),
self.db.block_handles.write_config(),
)
.unwrap();
);

if is_new {
batch.put_cf(
&self.db.full_block_ids.cf(),
PartialBlockId::from(id).to_vec(),
id.file_hash,
);
}

if is_key_block {
batch.put_cf(
&self.db.key_blocks.cf(),
id.seqno.to_be_bytes(),
id.to_vec(),
);
}

if handle.is_key_block() {
self.db
.key_blocks
.insert(id.seqno.to_be_bytes(), id.to_vec())
.unwrap();
.rocksdb()
.write_opt(batch, self.db.block_handles.write_config())
} else {
self.db.rocksdb().merge_cf_opt(
&self.db.block_handles.cf(),
id.root_hash,
handle.meta().to_vec(),
self.db.block_handles.write_config(),
)
}
.unwrap();
}

pub fn load_key_block_handle(&self, seqno: u32) -> Option<BlockHandle> {
Expand Down
Loading
Loading