Skip to content

Commit

Permalink
Rocksdb config (#234)
Browse files Browse the repository at this point in the history
* Add rlimit dependency and refactor database connection builder

Added the `rlimit` package as a new dependency. Rlimit dynamically determines the file limit based on the host platform, and is relevant for situations where the maximum file limit may exceed 1024. Furthermore, the open_db function in `db` modules was refactored and moved to a new module called `conn_builder`. This restructuring allows for greater adaptability and customization when establishing a database connection. It is now possible to set various parameters such as enabling statistics, setting memory budgets, adjusting Parallelism and a toggle for database creation if none exists.

* Enhance rocksdb configuration in the simulation environment

SimPa's network.rs file and many test files have been modified to provide more flexibility in configuring the underlying rocksdb. The number of threads, the memory budget, file descriptors limit and the statistics period can now be tweaked at runtime via command line arguments. Functions to create and load database instances were also refactored into macros for better maintainability. Lastly, some unused import statements were cleaned up. All these changes provide better control over the simulation and test environment and help optimize resource usage.

* Refactor database creation and stats handling

Refactored the code for creating and setting up the RocksDB databases to use match expressions instead of nested if-let statements. This simplification enhances code readability and maintainability. It also makes it easier to track the conditions for creating permanent and temporary databases and enabling their statistics. The similar refactoring was also extended to the logic for loading existing database.

* Refactor ConnBuilder struct and remove StatsPeriod trait

The ConnBuilder struct has been refactored to use struct update syntax reducing redundancy in method definitions whilst simultaneously improving code readability. The StatsPeriod trait has also been removed since it was no longer needed, which simplifies the codebase. The associated implementation for stats_period trait in ConnBuilder methods were removed accordingly.
  • Loading branch information
biryukovmaxim authored Aug 8, 2023
1 parent ce3e231 commit c69ac13
Show file tree
Hide file tree
Showing 18 changed files with 291 additions and 85 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ mod address_store_with_cache {
use super::*;
use address_manager::AddressManager;
use kaspa_consensus_core::config::{params::SIMNET_PARAMS, Config};
use kaspa_database::utils::create_temp_db;
use kaspa_database::create_temp_db;
use kaspa_database::prelude::ConnBuilder;
use kaspa_utils::networking::IpAddress;
use statest::ks::KSTest;
use statrs::distribution::Uniform;
Expand Down Expand Up @@ -387,7 +388,7 @@ mod address_store_with_cache {
// Assert that initial distribution is skewed, and hence not uniform from the outset.
assert!(bucket_reduction_ratio >= 1.25);

let db = create_temp_db();
let db = create_temp_db!(ConnBuilder::default());
let config = Config::new(SIMNET_PARAMS);
let am = AddressManager::new(Arc::new(config), db.1);

Expand Down
4 changes: 2 additions & 2 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl ConsensusFactory for Factory {
}
};
let dir = self.db_root_dir.join(entry.directory_name.clone());
let db = kaspa_database::prelude::open_db(dir, true, self.db_parallelism);
let db = kaspa_database::prelude::ConnBuilder::default().with_db_path(dir).with_parallelism(self.db_parallelism).build();

let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
Expand All @@ -218,7 +218,7 @@ impl ConsensusFactory for Factory {

let entry = self.management_store.write().new_staging_consensus_entry().unwrap();
let dir = self.db_root_dir.join(entry.directory_name);
let db = kaspa_database::prelude::open_db(dir, true, self.db_parallelism);
let db = kaspa_database::prelude::ConnBuilder::default().with_db_path(dir).with_parallelism(self.db_parallelism).build();

let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
Expand Down
8 changes: 5 additions & 3 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use kaspa_consensus_core::{
use kaspa_consensus_notify::{notification::Notification, root::ConsensusNotificationRoot};
use kaspa_consensusmanager::{ConsensusFactory, ConsensusInstance, DynConsensusCtl};
use kaspa_core::{core::Core, service::Service};
use kaspa_database::utils::{create_temp_db, DbLifetime};
use kaspa_database::utils::DbLifetime;
use kaspa_hashes::Hash;
use parking_lot::RwLock;

use kaspa_database::create_temp_db;
use kaspa_database::prelude::ConnBuilder;
use std::future::Future;
use std::{sync::Arc, thread::JoinHandle};

Expand Down Expand Up @@ -56,7 +58,7 @@ impl TestConsensus {

/// Creates a test consensus instance based on `config` with a temp DB and the provided `notification_sender`
pub fn with_notifier(config: &Config, notification_sender: Sender<Notification>) -> Self {
let (db_lifetime, db) = create_temp_db();
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
Expand All @@ -67,7 +69,7 @@ impl TestConsensus {

/// Creates a test consensus instance based on `config` with a temp DB and no notifier
pub fn new(config: &Config) -> Self {
let (db_lifetime, db) = create_temp_db();
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (dummy_notification_sender, _) = async_channel::unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let counters = Arc::new(ProcessingCounters::default());
Expand Down
5 changes: 2 additions & 3 deletions consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use kaspa_consensus_core::{
BlockHashMap, BlockHashSet, BlockLevel, HashMapCustomHasher, KType,
};
use kaspa_core::{debug, info, trace};
use kaspa_database::prelude::{StoreResultEmptyTuple, StoreResultExtensions};
use kaspa_database::prelude::{ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions};
use kaspa_hashes::Hash;
use kaspa_pow::calc_block_level;
use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions};
Expand Down Expand Up @@ -360,8 +360,7 @@ impl PruningProofManager {
let proof_pp_header = proof[0].last().expect("checked if empty");
let proof_pp = proof_pp_header.hash;
let proof_pp_level = calc_block_level(proof_pp_header, self.max_block_level);

let (db_lifetime, db) = kaspa_database::utils::create_temp_db();
let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default());
let headers_store = Arc::new(DbHeadersStore::new(db.clone(), 2 * self.pruning_proof_m)); // TODO: Think about cache size
let ghostdag_stores = (0..=self.max_block_level)
.map(|level| Arc::new(DbGhostdagStore::new(db.clone(), level, 2 * self.pruning_proof_m)))
Expand Down
7 changes: 4 additions & 3 deletions consensus/src/processes/reachability/inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ mod tests {
};
use itertools::Itertools;
use kaspa_consensus_core::blockhash::ORIGIN;
use kaspa_database::utils::create_temp_db;
use kaspa_database::create_temp_db;
use kaspa_database::prelude::ConnBuilder;
use parking_lot::RwLock;
use rand::seq::IteratorRandom;
use rocksdb::WriteBatch;
Expand Down Expand Up @@ -384,7 +385,7 @@ mod tests {
/// Runs a DAG test-case with full verification using the staging store mechanism.
/// Note: runtime is quadratic in the number of blocks so should be used with mildly small DAGs (~50)
fn run_dag_test_case_with_staging(test: &DagTestCase) {
let (_lifetime, db) = create_temp_db();
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let cache_size = test.blocks.len() as u64 / 3;
let reachability = RwLock::new(DbReachabilityStore::new(db.clone(), cache_size));
let relations = RwLock::new(DbRelationsStore::with_prefix(db.clone(), &[], 0));
Expand Down Expand Up @@ -532,7 +533,7 @@ mod tests {
run_dag_test_case(&mut relations, &mut reachability, &test);

// Run with direct DB stores
let (_lifetime, db) = create_temp_db();
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let cache_size = test.blocks.len() as u64 / 3;
let mut reachability = DbReachabilityStore::new(db.clone(), cache_size);
let mut relations = DbRelationsStore::new(db, 0, cache_size);
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/processes/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ mod tests {
use super::*;
use crate::model::stores::relations::{DbRelationsStore, RelationsStoreReader, StagingRelationsStore};
use kaspa_core::assert_match;
use kaspa_database::{prelude::MemoryWriter, utils::create_temp_db};
use kaspa_database::prelude::ConnBuilder;
use kaspa_database::{create_temp_db, prelude::MemoryWriter};
use std::sync::Arc;

#[test]
fn test_delete_level_relations_zero_cache() {
let (_lifetime, db) = create_temp_db();
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let cache_size = 0;
let mut relations = DbRelationsStore::new(db.clone(), 0, cache_size);
relations.insert(ORIGIN, Default::default()).unwrap();
Expand Down
1 change: 1 addition & 0 deletions database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ tempfile.workspace = true

enum-primitive-derive = "0.2.2"
num-traits = "0.2.15"
rlimit = "0.10.1"
20 changes: 5 additions & 15 deletions database/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::{path::PathBuf, sync::Arc};
use std::path::PathBuf;

pub use conn_builder::ConnBuilder;

mod conn_builder;

/// The DB type used for Kaspad stores
pub type DB = DBWithThreadMode<MultiThreaded>;

/// Creates or loads an existing DB from the provided directory path.
pub fn open_db(db_path: PathBuf, create_if_missing: bool, parallelism: usize) -> Arc<DB> {
let mut opts = rocksdb::Options::default();
if parallelism > 1 {
opts.increase_parallelism(parallelism as i32);
}
// In most linux environments the limit is set to 1024, so we use 500 to give sufficient slack.
// TODO: fine-tune this parameter and additional parameters related to max file size
opts.set_max_open_files(500);
opts.create_if_missing(create_if_missing);
let db = Arc::new(DB::open(&opts, db_path.to_str().unwrap()).unwrap());
db
}

/// Deletes an existing DB if it exists
pub fn delete_db(db_dir: PathBuf) {
if !db_dir.exists() {
Expand Down
140 changes: 140 additions & 0 deletions database/src/db/conn_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use crate::db::DB;
use rlimit::Resource;
use std::cmp::min;
use std::path::PathBuf;
use std::sync::Arc;

#[derive(Debug, Copy, Clone)]
pub struct Unspecified;

#[derive(Debug, Clone)]
pub struct ConnBuilder<Path: Clone, const STATS_ENABLED: bool, StatsPeriod: Clone> {
db_path: Path,
create_if_missing: bool,
parallelism: usize,
files_limit: i32,
mem_budget: usize,
stats_period: StatsPeriod,
}

impl Default for ConnBuilder<Unspecified, false, Unspecified> {
fn default() -> Self {
ConnBuilder {
db_path: Unspecified,
create_if_missing: true,
parallelism: 1,
files_limit: 500,
mem_budget: 64 * 1024 * 1024,
stats_period: Unspecified,
}
}
}

impl<Path: Clone, const STATS_ENABLED: bool, StatsPeriod: Clone> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
pub fn with_db_path(self, db_path: PathBuf) -> ConnBuilder<PathBuf, STATS_ENABLED, StatsPeriod> {
ConnBuilder {
db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
}
}
pub fn with_create_if_missing(self, create_if_missing: bool) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
ConnBuilder { create_if_missing, ..self }
}
pub fn with_parallelism(self, parallelism: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
ConnBuilder { parallelism: parallelism.into(), ..self }
}
pub fn with_files_limit(self, files_limit: impl Into<i32>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
ConnBuilder { files_limit: files_limit.into(), ..self }
}
pub fn with_mem_budget(self, mem_budget: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
ConnBuilder { mem_budget: mem_budget.into(), ..self }
}
}

impl<Path: Clone> ConnBuilder<Path, false, Unspecified> {
pub fn enable_stats(self) -> ConnBuilder<Path, true, Unspecified> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
}
}
}

impl<Path: Clone, StatsPeriod: Clone> ConnBuilder<Path, true, StatsPeriod> {
pub fn disable_stats(self) -> ConnBuilder<Path, false, Unspecified> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: Unspecified,
}
}
pub fn with_stats_period(self, stats_period: impl Into<u32>) -> ConnBuilder<Path, true, u32> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: stats_period.into(),
}
}
}

macro_rules! default_opts {
($self: expr) => {{
let mut opts = rocksdb::Options::default();
if $self.parallelism > 1 {
opts.increase_parallelism($self.parallelism as i32);
}
opts.optimize_level_style_compaction($self.mem_budget);

#[cfg(target_os = "windows")]
let files_limit = rlimit::getmaxstdio() as i32;
#[cfg(any(target_os = "macos", target_os = "linux"))]
let files_limit = rlimit::getrlimit(Resource::NOFILE).unwrap().0 as i32;
// In most linux environments the limit is set to 1024, so we use 500 to give sufficient slack.
// TODO: fine-tune this parameter and additional parameters related to max file size
opts.set_max_open_files(min(files_limit, $self.files_limit));
opts.create_if_missing($self.create_if_missing);
opts
}};
}

impl ConnBuilder<PathBuf, false, Unspecified> {
pub fn build(self) -> Arc<DB> {
let opts = default_opts!(self);
let db = Arc::new(DB::open(&opts, self.db_path.to_str().unwrap()).unwrap());
db
}
}

impl ConnBuilder<PathBuf, true, Unspecified> {
pub fn build(self) -> Arc<DB> {
let mut opts = default_opts!(self);
opts.enable_statistics();
let db = Arc::new(DB::open(&opts, self.db_path.to_str().unwrap()).unwrap());
db
}
}

impl ConnBuilder<PathBuf, true, u32> {
pub fn build(self) -> Arc<DB> {
let mut opts = default_opts!(self);
opts.enable_statistics();
opts.set_report_bg_io_stats(true);
opts.set_stats_dump_period_sec(self.stats_period);
let db = Arc::new(DB::open(&opts, self.db_path.to_str().unwrap()).unwrap());
db
}
}
2 changes: 1 addition & 1 deletion database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ pub mod prelude {
pub use super::item::CachedDbItem;
pub use super::key::DbKey;
pub use super::writer::{BatchDbWriter, DbWriter, DirectDbWriter, DirectWriter, MemoryWriter};
pub use db::{delete_db, open_db, DB};
pub use db::{delete_db, ConnBuilder, DB};
pub use errors::{StoreError, StoreResult, StoreResultEmptyTuple, StoreResultExtensions};
}
Loading

0 comments on commit c69ac13

Please sign in to comment.