diff --git a/Cargo.lock b/Cargo.lock index cfb1b7a..d801eaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -514,13 +514,14 @@ dependencies = [ "minicbor 0.24.2", "num-bigint", "num-rational", - "pallas-crypto 0.30.0 (git+https://github.com/txpipe/pallas?rev=d3084d6e3209ee0f507a413b1a2491e07abf3756)", + "pallas-crypto 0.30.0 (git+https://github.com/txpipe/pallas?rev=f82bc469264a7b209a97edef83f20b6849bf8315)", "pallas-network", "pallas-traverse", "pkg-config", "pretty_env_logger", "rand", "rayon", + "redb", "regex", "reqwest", "rusqlite", @@ -534,6 +535,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -1438,7 +1440,7 @@ dependencies = [ [[package]] name = "pallas-codec" version = "0.30.0" -source = "git+https://github.com/txpipe/pallas?rev=d3084d6e3209ee0f507a413b1a2491e07abf3756#d3084d6e3209ee0f507a413b1a2491e07abf3756" +source = "git+https://github.com/txpipe/pallas?rev=f82bc469264a7b209a97edef83f20b6849bf8315#f82bc469264a7b209a97edef83f20b6849bf8315" dependencies = [ "hex", "minicbor 0.20.0", @@ -1463,11 +1465,11 @@ dependencies = [ [[package]] name = "pallas-crypto" version = "0.30.0" -source = "git+https://github.com/txpipe/pallas?rev=d3084d6e3209ee0f507a413b1a2491e07abf3756#d3084d6e3209ee0f507a413b1a2491e07abf3756" +source = "git+https://github.com/txpipe/pallas?rev=f82bc469264a7b209a97edef83f20b6849bf8315#f82bc469264a7b209a97edef83f20b6849bf8315" dependencies = [ "cryptoxide", "hex", - "pallas-codec 0.30.0 (git+https://github.com/txpipe/pallas?rev=d3084d6e3209ee0f507a413b1a2491e07abf3756)", + "pallas-codec 0.30.0 (git+https://github.com/txpipe/pallas?rev=f82bc469264a7b209a97edef83f20b6849bf8315)", "rand_core", "serde", "thiserror", @@ -1826,6 +1828,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redb" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6dd20d3cdeb9c7d2366a0b16b93b35b75aec15309fbeb7ce477138c9f68c8c0" +dependencies = [ + "libc", +] + [[package]] name = "regex" version = "1.10.6" @@ -2460,6 +2471,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 16cd656..5d08366 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ blake2b_simd = "1.0" byteorder = "1.5" #pallas-network = { git = "https://github.com/AndrewWestberg/pallas", rev="35f693c57eec5f70c4f8e2f6a24445b14c6104b9" } #pallas-traverse = { git = "https://github.com/AndrewWestberg/pallas", rev="35f693c57eec5f70c4f8e2f6a24445b14c6104b9" } -pallas-crypto = { git = "https://github.com/txpipe/pallas", rev = "d3084d6e3209ee0f507a413b1a2491e07abf3756" } +pallas-crypto = { git = "https://github.com/txpipe/pallas", rev = "f82bc469264a7b209a97edef83f20b6849bf8315" } pallas-network = "0.30" pallas-traverse = "0.30" #pallas-crypto = "0.30.0" @@ -28,6 +28,7 @@ futures = "0.3" hex = "0.4" libc = "0.2" minicbor = { version = "0.24", features = ["std"] } +redb = "2.1.1" regex = "1.10" reqwest = { version = "0.12", default-features = false, features = ["blocking", "rustls-tls-webpki-roots", "rustls-tls", "json", "gzip", "deflate"] } rusqlite = { version = "0.32", features = ["bundled"] } @@ -44,6 +45,7 @@ tokio = { version = "1", features = ["rt", "rt-multi-thread", "net", "io-util", thiserror = "1.0" tracing = "0.1" tracing-subscriber = "0.3" +uuid = { version = "1", features = ["v7"] } # logging diff --git a/src/lib.rs b/src/lib.rs index f70fa12..90f9306 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,544 +1,517 @@ -pub mod nodeclient { - use std::fs::File; - use std::io::{stdout, BufReader}; - use std::path::{Path, PathBuf}; - use std::str::FromStr; - use std::string::ParseError; - use std::thread; - use std::thread::JoinHandle; +use std::io::stdout; +use std::path::PathBuf; +use std::str::FromStr; +use std::string::ParseError; +use std::thread; +use std::thread::JoinHandle; - use serde::Deserialize; - use structopt::StructOpt; +use structopt::StructOpt; - use crate::nodeclient::leaderlog::handle_error; +use crate::nodeclient::leaderlog::handle_error; +use crate::nodeclient::sync::pooltool; +use crate::nodeclient::sync::pooltool::PooltoolConfig; +use crate::nodeclient::{leaderlog, ping, sign, snapshot, sync, validate}; - pub mod leaderlog; - pub mod math; - pub mod ping; - pub mod pooltool; - pub mod signing; - pub mod snapshot; - pub mod sqlite; - pub mod sync; - mod validate; +pub(crate) mod nodeclient; - pub static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); +pub static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); - #[derive(Debug)] - pub enum LedgerSet { - Mark, - Set, - Go, - } +#[derive(Debug)] +pub enum LedgerSet { + Mark, + Set, + Go, +} - impl FromStr for LedgerSet { - type Err = ParseError; - fn from_str(ledger_set: &str) -> Result { - match ledger_set { - "next" => Ok(LedgerSet::Mark), - "current" => Ok(LedgerSet::Set), - "prev" => Ok(LedgerSet::Go), - _ => Ok(LedgerSet::Set), - } +impl FromStr for LedgerSet { + type Err = ParseError; + fn from_str(ledger_set: &str) -> Result { + match ledger_set { + "next" => Ok(LedgerSet::Mark), + "current" => Ok(LedgerSet::Set), + "prev" => Ok(LedgerSet::Go), + _ => Ok(LedgerSet::Set), } } +} - #[derive(Debug, StructOpt)] - pub enum Command { - Ping { - #[structopt(short, long, help = "cardano-node hostname to connect to")] - host: String, - #[structopt(short, long, default_value = "3001", help = "cardano-node port")] - port: u16, - #[structopt(long, default_value = "764824073", help = "network magic.")] - network_magic: u64, - #[structopt(short, long, default_value = "2", help = "connect timeout in seconds")] - timeout_seconds: u64, - }, - Validate { - #[structopt(long, help = "full or partial block hash to validate")] - hash: String, - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - }, - Sync { - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(short, long, help = "cardano-node hostname to connect to")] - host: String, - #[structopt(short, long, default_value = "3001", help = "cardano-node port")] - port: u16, - #[structopt(long, default_value = "764824073", help = "network magic.")] - network_magic: u64, - #[structopt(long, help = "Exit at 100% sync'd.")] - no_service: bool, - #[structopt( - short, - long, - default_value = "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81", - help = "shelley genesis hash value" - )] - shelley_genesis_hash: String, - }, - Leaderlog { - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(parse(from_os_str), long, help = "byron genesis json file")] - byron_genesis: PathBuf, - #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] - shelley_genesis: PathBuf, - #[structopt(long, help = "pool active stake snapshot value in lovelace")] - pool_stake: u64, - #[structopt(long, help = "total active stake snapshot value in lovelace")] - active_stake: u64, - #[structopt(long = "d", default_value = "0", help = "decentralization parameter")] - d: f64, - #[structopt(long, help = "hex string of the extra entropy value")] - extra_entropy: Option, - #[structopt( - long, - default_value = "current", - help = "Which ledger data to use. prev - previous epoch, current - current epoch, next - future epoch" - )] - ledger_set: LedgerSet, - #[structopt(long, help = "lower-case hex pool id")] - pool_id: String, - #[structopt(parse(from_os_str), long, help = "pool's vrf.skey file")] - pool_vrf_skey: PathBuf, - #[structopt( - long = "tz", - default_value = "America/Los_Angeles", - help = "TimeZone string from the IANA database - https://en.wikipedia.org/wiki/List_of_tz_database_time_zones" - )] - timezone: String, - #[structopt( - short, - long, - default_value = "praos", - help = "Consensus algorithm - Alonzo and earlier uses tpraos, Babbage uses praos, Conway uses cpraos" - )] - consensus: String, - #[structopt( - long, - env = "SHELLEY_TRANS_EPOCH", - default_value = "-1", - help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" - )] - shelley_transition_epoch: i64, - #[structopt( - long, - help = "Provide a nonce value in lower-case hex instead of calculating from the db" - )] - nonce: Option, - #[structopt( - long, - help = "Provide a specific epoch number to calculate for and ignore --ledger-set option" - )] - epoch: Option, - }, - Sendtip { - #[structopt( - parse(from_os_str), - long, - default_value = "./pooltool.json", - help = "pooltool config file for sending tips" - )] - config: PathBuf, - #[structopt( - parse(from_os_str), - long, - help = "path to cardano-node executable for gathering version info" - )] - cardano_node: PathBuf, - }, - Sendslots { - #[structopt( - parse(from_os_str), - long, - default_value = "./pooltool.json", - help = "pooltool config file for sending slots" - )] - config: PathBuf, - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(parse(from_os_str), long, help = "byron genesis json file")] - byron_genesis: PathBuf, - #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] - shelley_genesis: PathBuf, - #[structopt( - long, - env = "SHELLEY_TRANS_EPOCH", - default_value = "-1", - help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" - )] - shelley_transition_epoch: i64, - #[structopt(long, env = "OVERRIDE_TIME", hide_env_values = true, hidden = true)] - override_time: Option, - }, - Status { - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(parse(from_os_str), long, help = "byron genesis json file")] - byron_genesis: PathBuf, - #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] - shelley_genesis: PathBuf, - #[structopt( - long, - env = "SHELLEY_TRANS_EPOCH", - default_value = "-1", - help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" - )] - shelley_transition_epoch: i64, - }, - Nonce { - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(parse(from_os_str), long, help = "byron genesis json file")] - byron_genesis: PathBuf, - #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] - shelley_genesis: PathBuf, - #[structopt(long, help = "hex string of the extra entropy value")] - extra_entropy: Option, - #[structopt( - long, - default_value = "current", - help = "Which ledger data to use. prev - previous epoch, current - current epoch, next - future epoch" - )] - ledger_set: LedgerSet, - #[structopt( - long, - env = "SHELLEY_TRANS_EPOCH", - default_value = "-1", - help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" - )] - shelley_transition_epoch: i64, - #[structopt( - short, - long, - default_value = "praos", - help = "Consensus algorithm - Alonzo and earlier uses tpraos, Babbage uses praos, Conway uses cpraos" - )] - consensus: String, - #[structopt( - long, - help = "Provide a specific epoch number to calculate for and ignore --ledger-set option" - )] - epoch: Option, - }, - Challenge { - #[structopt(long, help = "validating domain e.g. pooltool.io")] - domain: String, - }, - Sign { - #[structopt(parse(from_os_str), long, help = "pool's vrf.skey file")] - pool_vrf_skey: PathBuf, - #[structopt(long, help = "validating domain e.g. pooltool.io")] - domain: String, - #[structopt(long, help = "nonce value in lower-case hex")] - nonce: String, - }, - Verify { - #[structopt(parse(from_os_str), long, help = "pool's vrf.vkey file")] - pool_vrf_vkey: PathBuf, - #[structopt( - long, - help = "pool's vrf hash in hex retrieved from 'cardano-cli query pool-params...'" - )] - pool_vrf_vkey_hash: String, - #[structopt(long, help = "validating domain e.g. pooltool.io")] - domain: String, - #[structopt(long, help = "nonce value in lower-case hex")] - nonce: String, - #[structopt(long, help = "signature to verify in hex")] - signature: String, - }, - Snapshot { - #[structopt(parse(from_os_str), long, help = "cardano-node socket path")] - socket_path: PathBuf, - #[structopt(long, default_value = "764824073", help = "network magic.")] - network_magic: u64, - #[structopt(long, default_value = "mark", help = "Snapshot name to retrieve (mark, set, go)")] - name: String, - #[structopt( - long, - default_value = "1", - help = "The network identifier, (1 for mainnet, 0 for testnet)" - )] - network_id: u8, - #[structopt( - long, - default_value = "stake", - help = "The prefix for stake addresses, (stake for mainnet, stake_test for testnet)" - )] - stake_prefix: String, - #[structopt(long, default_value = "mark.csv", help = "The name of the output file (CSV format)")] - output_file: String, - }, - } +#[derive(Debug, StructOpt)] +pub enum Command { + Ping { + #[structopt(short, long, help = "cardano-node hostname to connect to")] + host: String, + #[structopt(short, long, default_value = "3001", help = "cardano-node port")] + port: u16, + #[structopt(long, default_value = "764824073", help = "network magic.")] + network_magic: u64, + #[structopt(short, long, default_value = "2", help = "connect timeout in seconds")] + timeout_seconds: u64, + }, + Validate { + #[structopt(long, help = "full or partial block hash to validate")] + hash: String, + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + }, + Sync { + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + #[structopt(short, long, help = "cardano-node hostname to connect to")] + host: String, + #[structopt(short, long, default_value = "3001", help = "cardano-node port")] + port: u16, + #[structopt(long, default_value = "764824073", help = "network magic.")] + network_magic: u64, + #[structopt(long, help = "Exit at 100% sync'd.")] + no_service: bool, + #[structopt( + short, + long, + default_value = "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81", + help = "shelley genesis hash value" + )] + shelley_genesis_hash: String, + }, + Leaderlog { + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + #[structopt(parse(from_os_str), long, help = "byron genesis json file")] + byron_genesis: PathBuf, + #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] + shelley_genesis: PathBuf, + #[structopt(long, help = "pool active stake snapshot value in lovelace")] + pool_stake: u64, + #[structopt(long, help = "total active stake snapshot value in lovelace")] + active_stake: u64, + #[structopt(long = "d", default_value = "0", help = "decentralization parameter")] + d: f64, + #[structopt(long, help = "hex string of the extra entropy value")] + extra_entropy: Option, + #[structopt( + long, + default_value = "current", + help = "Which ledger data to use. prev - previous epoch, current - current epoch, next - future epoch" + )] + ledger_set: LedgerSet, + #[structopt(long, help = "lower-case hex pool id")] + pool_id: String, + #[structopt(parse(from_os_str), long, help = "pool's vrf.skey file")] + pool_vrf_skey: PathBuf, + #[structopt( + long = "tz", + default_value = "America/Los_Angeles", + help = "TimeZone string from the IANA database - https://en.wikipedia.org/wiki/List_of_tz_database_time_zones" + )] + timezone: String, + #[structopt( + short, + long, + default_value = "praos", + help = "Consensus algorithm - Alonzo and earlier uses tpraos, Babbage uses praos, Conway uses cpraos" + )] + consensus: String, + #[structopt( + long, + env = "SHELLEY_TRANS_EPOCH", + default_value = "-1", + help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" + )] + shelley_transition_epoch: i64, + #[structopt( + long, + help = "Provide a nonce value in lower-case hex instead of calculating from the db" + )] + nonce: Option, + #[structopt( + long, + help = "Provide a specific epoch number to calculate for and ignore --ledger-set option" + )] + epoch: Option, + }, + Sendtip { + #[structopt( + parse(from_os_str), + long, + default_value = "./pooltool.json", + help = "pooltool config file for sending tips" + )] + config: PathBuf, + #[structopt( + parse(from_os_str), + long, + help = "path to cardano-node executable for gathering version info" + )] + cardano_node: PathBuf, + }, + Sendslots { + #[structopt( + parse(from_os_str), + long, + default_value = "./pooltool.json", + help = "pooltool config file for sending slots" + )] + config: PathBuf, + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + #[structopt(parse(from_os_str), long, help = "byron genesis json file")] + byron_genesis: PathBuf, + #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] + shelley_genesis: PathBuf, + #[structopt( + long, + env = "SHELLEY_TRANS_EPOCH", + default_value = "-1", + help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" + )] + shelley_transition_epoch: i64, + #[structopt(long, env = "OVERRIDE_TIME", hide_env_values = true, hidden = true)] + override_time: Option, + }, + Status { + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + #[structopt(parse(from_os_str), long, help = "byron genesis json file")] + byron_genesis: PathBuf, + #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] + shelley_genesis: PathBuf, + #[structopt( + long, + env = "SHELLEY_TRANS_EPOCH", + default_value = "-1", + help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" + )] + shelley_transition_epoch: i64, + }, + Nonce { + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + #[structopt(parse(from_os_str), long, help = "byron genesis json file")] + byron_genesis: PathBuf, + #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] + shelley_genesis: PathBuf, + #[structopt(long, help = "hex string of the extra entropy value")] + extra_entropy: Option, + #[structopt( + long, + default_value = "current", + help = "Which ledger data to use. prev - previous epoch, current - current epoch, next - future epoch" + )] + ledger_set: LedgerSet, + #[structopt( + long, + env = "SHELLEY_TRANS_EPOCH", + default_value = "-1", + help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" + )] + shelley_transition_epoch: i64, + #[structopt( + short, + long, + default_value = "praos", + help = "Consensus algorithm - Alonzo and earlier uses tpraos, Babbage uses praos, Conway uses cpraos" + )] + consensus: String, + #[structopt( + long, + help = "Provide a specific epoch number to calculate for and ignore --ledger-set option" + )] + epoch: Option, + }, + Challenge { + #[structopt(long, help = "validating domain e.g. pooltool.io")] + domain: String, + }, + Sign { + #[structopt(parse(from_os_str), long, help = "pool's vrf.skey file")] + pool_vrf_skey: PathBuf, + #[structopt(long, help = "validating domain e.g. pooltool.io")] + domain: String, + #[structopt(long, help = "nonce value in lower-case hex")] + nonce: String, + }, + Verify { + #[structopt(parse(from_os_str), long, help = "pool's vrf.vkey file")] + pool_vrf_vkey: PathBuf, + #[structopt( + long, + help = "pool's vrf hash in hex retrieved from 'cardano-cli query pool-params...'" + )] + pool_vrf_vkey_hash: String, + #[structopt(long, help = "validating domain e.g. pooltool.io")] + domain: String, + #[structopt(long, help = "nonce value in lower-case hex")] + nonce: String, + #[structopt(long, help = "signature to verify in hex")] + signature: String, + }, + Snapshot { + #[structopt(parse(from_os_str), long, help = "cardano-node socket path")] + socket_path: PathBuf, + #[structopt(long, default_value = "764824073", help = "network magic.")] + network_magic: u64, + #[structopt(long, default_value = "mark", help = "Snapshot name to retrieve (mark, set, go)")] + name: String, + #[structopt( + long, + default_value = "1", + help = "The network identifier, (1 for mainnet, 0 for testnet)" + )] + network_id: u8, + #[structopt( + long, + default_value = "stake", + help = "The prefix for stake addresses, (stake for mainnet, stake_test for testnet)" + )] + stake_prefix: String, + #[structopt(long, default_value = "mark.csv", help = "The name of the output file (CSV format)")] + output_file: String, + }, +} - pub async fn start(cmd: Command) { - match cmd { - Command::Ping { - ref host, - ref port, - ref network_magic, - ref timeout_seconds, - } => { - ping::ping(&mut stdout(), host.as_str(), *port, *network_magic, *timeout_seconds).await; - } - Command::Validate { ref db, ref hash } => { - validate::validate_block(db, hash.as_str()); +pub async fn start(cmd: Command) { + match cmd { + Command::Ping { + ref host, + ref port, + ref network_magic, + ref timeout_seconds, + } => { + ping::ping(&mut stdout(), host.as_str(), *port, *network_magic, *timeout_seconds).await; + } + Command::Validate { ref db, ref hash } => { + validate::validate_block(db, hash.as_str()); + } + Command::Sync { + ref db, + ref host, + ref port, + ref network_magic, + ref no_service, + ref shelley_genesis_hash, + } => { + sync::sync( + db, + host.as_str(), + *port, + *network_magic, + shelley_genesis_hash.as_str(), + *no_service, + true, + ) + .await; + } + Command::Leaderlog { + ref db, + ref byron_genesis, + ref shelley_genesis, + ref pool_stake, + ref active_stake, + ref d, + ref extra_entropy, + ref ledger_set, + ref pool_id, + ref pool_vrf_skey, + ref timezone, + ref consensus, + ref shelley_transition_epoch, + ref nonce, + ref epoch, + } => { + if let Err(error) = leaderlog::calculate_leader_logs( + db, + byron_genesis, + shelley_genesis, + pool_stake, + active_stake, + d, + extra_entropy, + ledger_set, + pool_id, + pool_vrf_skey, + timezone, + false, + consensus, + shelley_transition_epoch, + nonce, + epoch, + ) { + handle_error(error); } - Command::Sync { - ref db, - ref host, - ref port, - ref network_magic, - ref no_service, - ref shelley_genesis_hash, - } => { - sync::sync( - db, - host.as_str(), - *port, - *network_magic, - shelley_genesis_hash.as_str(), - *no_service, - ) - .await; + } + Command::Nonce { + ref db, + ref byron_genesis, + ref shelley_genesis, + ref extra_entropy, + ref ledger_set, + ref shelley_transition_epoch, + ref consensus, + ref epoch, + } => { + if let Err(error) = leaderlog::calculate_leader_logs( + db, + byron_genesis, + shelley_genesis, + &0u64, + &0u64, + &0f64, + extra_entropy, + ledger_set, + "nonce", + &PathBuf::new(), + "America/Los_Angeles", + true, + consensus, + shelley_transition_epoch, + &None, + epoch, + ) { + handle_error(error); } - Command::Leaderlog { - ref db, - ref byron_genesis, - ref shelley_genesis, - ref pool_stake, - ref active_stake, - ref d, - ref extra_entropy, - ref ledger_set, - ref pool_id, - ref pool_vrf_skey, - ref timezone, - ref consensus, - ref shelley_transition_epoch, - ref nonce, - ref epoch, - } => { - if let Err(error) = leaderlog::calculate_leader_logs( - db, - byron_genesis, - shelley_genesis, - pool_stake, - active_stake, - d, - extra_entropy, - ledger_set, - pool_id, - pool_vrf_skey, - timezone, - false, - consensus, - shelley_transition_epoch, - nonce, - epoch, - ) { - handle_error(error); - } + } + Command::Sendtip { + ref config, + ref cardano_node, + } => { + if !config.exists() { + handle_error("config not found!"); + return; } - Command::Nonce { - ref db, - ref byron_genesis, - ref shelley_genesis, - ref extra_entropy, - ref ledger_set, - ref shelley_transition_epoch, - ref consensus, - ref epoch, - } => { - if let Err(error) = leaderlog::calculate_leader_logs( - db, - byron_genesis, - shelley_genesis, - &0u64, - &0u64, - &0f64, - extra_entropy, - ledger_set, - "nonce", - &PathBuf::new(), - "America/Los_Angeles", - true, - consensus, - shelley_transition_epoch, - &None, - epoch, - ) { - handle_error(error); - } + if !cardano_node.exists() { + handle_error("cardano-node not found!"); + return; } - Command::Sendtip { - ref config, - ref cardano_node, - } => { - if !config.exists() { - handle_error("config not found!"); - return; - } - if !cardano_node.exists() { - handle_error("cardano-node not found!"); - return; - } - - let pooltool_config: PooltoolConfig = get_pooltool_config(config); - let mut handles: Vec> = vec![]; - for pool in pooltool_config.pools.into_iter() { - let api_key = pooltool_config.api_key.clone(); - let cardano_node_path = cardano_node.clone(); - handles.push(thread::spawn(move || { - tokio::runtime::Runtime::new().unwrap().block_on(sync::sendtip( - pool.name, - pool.pool_id, - pool.host, - pool.port, - api_key, - &cardano_node_path, - )); - })); - } - for handle in handles { - handle.join().unwrap() - } + let pooltool_config: PooltoolConfig = pooltool::get_pooltool_config(config); + let mut handles: Vec> = vec![]; + for pool in pooltool_config.pools.into_iter() { + let api_key = pooltool_config.api_key.clone(); + let cardano_node_path = cardano_node.clone(); + handles.push(thread::spawn(move || { + tokio::runtime::Runtime::new().unwrap().block_on(sync::sendtip( + pool.name, + pool.pool_id, + pool.host, + pool.port, + api_key, + &cardano_node_path, + )); + })); } - Command::Sendslots { - ref config, - ref db, - ref byron_genesis, - ref shelley_genesis, - ref shelley_transition_epoch, - ref override_time, - } => { - if !config.exists() { - handle_error("config not found!"); - return; - } - let pooltool_config: PooltoolConfig = get_pooltool_config(config); - leaderlog::send_slots( - db, - byron_genesis, - shelley_genesis, - pooltool_config, - shelley_transition_epoch, - override_time, - ); - } - Command::Status { - ref db, - ref byron_genesis, - ref shelley_genesis, - ref shelley_transition_epoch, - } => { - leaderlog::status(db, byron_genesis, shelley_genesis, shelley_transition_epoch); - } - Command::Challenge { ref domain } => { - signing::create_challenge(domain); + + for handle in handles { + handle.join().unwrap() } - Command::Sign { - ref pool_vrf_skey, - ref domain, - ref nonce, - } => { - if !pool_vrf_skey.exists() { - handle_error("vrf.skey not found!"); - return; - } - signing::sign_challenge(pool_vrf_skey, domain, nonce); + } + Command::Sendslots { + ref config, + ref db, + ref byron_genesis, + ref shelley_genesis, + ref shelley_transition_epoch, + ref override_time, + } => { + if !config.exists() { + handle_error("config not found!"); + return; } - Command::Verify { - ref pool_vrf_vkey, - ref pool_vrf_vkey_hash, - ref domain, - ref nonce, - ref signature, - } => { - signing::verify_challenge(pool_vrf_vkey, pool_vrf_vkey_hash, domain, nonce, signature); + let pooltool_config: PooltoolConfig = pooltool::get_pooltool_config(config); + leaderlog::send_slots( + db, + byron_genesis, + shelley_genesis, + pooltool_config, + shelley_transition_epoch, + override_time, + ); + } + Command::Status { + ref db, + ref byron_genesis, + ref shelley_genesis, + ref shelley_transition_epoch, + } => { + leaderlog::status(db, byron_genesis, shelley_genesis, shelley_transition_epoch); + } + Command::Challenge { ref domain } => { + sign::create_challenge(domain); + } + Command::Sign { + ref pool_vrf_skey, + ref domain, + ref nonce, + } => { + if !pool_vrf_skey.exists() { + handle_error("vrf.skey not found!"); + return; } - Command::Snapshot { - ref socket_path, - ref network_magic, - ref name, - ref network_id, - ref stake_prefix, - ref output_file, - } => { - if let Err(error) = snapshot::dump( - socket_path, - *network_magic, - name.as_str(), - *network_id, - stake_prefix.as_str(), - output_file.as_str(), - ) - .await - { - handle_error(error); - } + sign::sign_challenge(pool_vrf_skey, domain, nonce); + } + Command::Verify { + ref pool_vrf_vkey, + ref pool_vrf_vkey_hash, + ref domain, + ref nonce, + ref signature, + } => { + sign::verify_challenge(pool_vrf_vkey, pool_vrf_vkey_hash, domain, nonce, signature); + } + Command::Snapshot { + ref socket_path, + ref network_magic, + ref name, + ref network_id, + ref stake_prefix, + ref output_file, + } => { + if let Err(error) = snapshot::dump( + socket_path, + *network_magic, + name.as_str(), + *network_id, + stake_prefix.as_str(), + output_file.as_str(), + ) + .await + { + handle_error(error); } } } - - fn get_pooltool_config(config: &Path) -> PooltoolConfig { - let buf = BufReader::new(File::open(config).unwrap()); - serde_json::from_reader(buf).unwrap() - } - - #[derive(Debug, Deserialize)] - pub struct PooltoolConfig { - api_key: String, - pools: Vec, - } - - #[derive(Debug, Deserialize)] - struct Pool { - name: String, - pool_id: String, - host: String, - port: u16, - } } diff --git a/src/main.rs b/src/main.rs index 9b255e6..e50d41f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use std::{panic, process}; use structopt::StructOpt; -use cncli::nodeclient::{self, Command}; +use cncli::Command; pub mod built_info { include!(concat!(env!("OUT_DIR"), "/built.rs")); @@ -21,7 +21,9 @@ pub mod built_info { } #[derive(Debug, StructOpt)] -#[structopt(name = "cncli", about = "A community-built cardano-node CLI", version = built_info::version())] +#[structopt( + name = "cncli", about = "A community-built cardano-node CLI", version = built_info::version() +)] struct Cli { #[structopt(subcommand)] cmd: Command, @@ -66,7 +68,7 @@ async fn main() { })); let args = Cli::from_args(); - nodeclient::start(args.cmd).await; + cncli::start(args.cmd).await; } #[cfg(test)] diff --git a/src/nodeclient/blockstore/mod.rs b/src/nodeclient/blockstore/mod.rs new file mode 100644 index 0000000..6e348c9 --- /dev/null +++ b/src/nodeclient/blockstore/mod.rs @@ -0,0 +1,20 @@ +use thiserror::Error; + +use crate::nodeclient::sync::BlockHeader; + +pub(crate) mod redb; +pub(crate) mod sqlite; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Redb error: {0}")] + Redb(#[from] redb::Error), + + #[error("Sqlite error: {0}")] + Sqlite(#[from] sqlite::Error), +} + +pub(crate) trait BlockStore { + fn save_block(&mut self, pending_blocks: &mut Vec, shelley_genesis_hash: &str) -> Result<(), Error>; + fn load_blocks(&mut self) -> Option)>>; +} diff --git a/src/nodeclient/blockstore/redb.rs b/src/nodeclient/blockstore/redb.rs new file mode 100644 index 0000000..8c10ce4 --- /dev/null +++ b/src/nodeclient/blockstore/redb.rs @@ -0,0 +1,328 @@ +use std::convert::TryInto; +use std::path::Path; + +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::BlockStore; +use crate::nodeclient::sync::BlockHeader; +use pallas_crypto::hash::{Hash, Hasher}; +use pallas_crypto::nonce::rolling_nonce::RollingNonceGenerator; +use pallas_crypto::nonce::NonceGenerator; +use redb::{Builder, Database, ReadableTable, RepairSession, TableDefinition, TypeName, Value}; +use thiserror::Error; +use tracing::{debug, info}; +use uuid::Uuid; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Redb error: {0}")] + Redb(#[from] redb::Error), + + #[error("Redb db error: {0}")] + RedbDb(#[from] redb::DatabaseError), + + #[error("Redb commit error: {0}")] + RedbCommit(#[from] redb::CommitError), + + #[error("Redb transaction error: {0}")] + RedbTransaction(#[from] redb::TransactionError), + + #[error("Redb table error: {0}")] + RedbTable(#[from] redb::TableError), + + #[error("Redb storage error: {0}")] + RedbStorage(#[from] redb::StorageError), + + #[error("Nonce error: {0}")] + Nonce(#[from] pallas_crypto::nonce::Error), +} + +#[derive(Debug, Clone)] +struct ChainRecord { + block_number: u64, + slot_number: u64, + hash: Vec, + prev_hash: Vec, + pool_id: Vec, + eta_v: Vec, + node_vkey: Vec, + node_vrf_vkey: Vec, + block_vrf_0: Vec, + block_vrf_1: Vec, + eta_vrf_0: Vec, + eta_vrf_1: Vec, + leader_vrf_0: Vec, + leader_vrf_1: Vec, + block_size: u64, + block_body_hash: Vec, + pool_opcert: Vec, + unknown_0: u64, + unknown_1: u64, + unknown_2: Vec, + protocol_major_version: u64, + protocol_minor_version: u64, + orphaned: bool, +} + +impl Value for ChainRecord { + type SelfType<'a> = Self; + type AsBytes<'a> = [u8; size_of::()] + where + Self: 'a; + + fn fixed_width() -> Option { + // dynamic sized object. not fixed width + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + // deserialize using minicbor + let mut decoder = minicbor::Decoder::new(data); + decoder.array().unwrap().unwrap(); + let chain_record = ChainRecord { + block_number: decoder.u64().unwrap(), + slot_number: decoder.u64().unwrap(), + hash: decoder.bytes().unwrap().to_vec(), + prev_hash: decoder.bytes().unwrap().to_vec(), + pool_id: decoder.bytes().unwrap().to_vec(), + eta_v: decoder.bytes().unwrap().to_vec(), + node_vkey: decoder.bytes().unwrap().to_vec(), + node_vrf_vkey: decoder.bytes().unwrap().to_vec(), + block_vrf_0: decoder.bytes().unwrap().to_vec(), + block_vrf_1: decoder.bytes().unwrap().to_vec(), + eta_vrf_0: decoder.bytes().unwrap().to_vec(), + eta_vrf_1: decoder.bytes().unwrap().to_vec(), + leader_vrf_0: decoder.bytes().unwrap().to_vec(), + leader_vrf_1: decoder.bytes().unwrap().to_vec(), + block_size: decoder.u64().unwrap(), + block_body_hash: decoder.bytes().unwrap().to_vec(), + pool_opcert: decoder.bytes().unwrap().to_vec(), + unknown_0: decoder.u64().unwrap(), + unknown_1: decoder.u64().unwrap(), + unknown_2: decoder.bytes().unwrap().to_vec(), + protocol_major_version: decoder.u64().unwrap(), + protocol_minor_version: decoder.u64().unwrap(), + orphaned: decoder.bool().unwrap(), + }; + chain_record + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + // serialize using minicbor + let mut writer: Vec = Vec::new(); + let mut encoder = minicbor::Encoder::new(&mut writer); + encoder.array(23).unwrap(); + encoder.u64(value.block_number).unwrap(); + encoder.u64(value.slot_number).unwrap(); + encoder.bytes(&value.hash).unwrap(); + encoder.bytes(&value.prev_hash).unwrap(); + encoder.bytes(&value.pool_id).unwrap(); + encoder.bytes(&value.eta_v).unwrap(); + encoder.bytes(&value.node_vkey).unwrap(); + encoder.bytes(&value.node_vrf_vkey).unwrap(); + encoder.bytes(&value.block_vrf_0).unwrap(); + encoder.bytes(&value.block_vrf_1).unwrap(); + encoder.bytes(&value.eta_vrf_0).unwrap(); + encoder.bytes(&value.eta_vrf_1).unwrap(); + encoder.bytes(&value.leader_vrf_0).unwrap(); + encoder.bytes(&value.leader_vrf_1).unwrap(); + encoder.u64(value.block_size).unwrap(); + encoder.bytes(&value.block_body_hash).unwrap(); + encoder.bytes(&value.pool_opcert).unwrap(); + encoder.u64(value.unknown_0).unwrap(); + encoder.u64(value.unknown_1).unwrap(); + encoder.bytes(&value.unknown_2).unwrap(); + encoder.u64(value.protocol_major_version).unwrap(); + encoder.u64(value.protocol_minor_version).unwrap(); + encoder.bool(value.orphaned).unwrap(); + + writer.try_into().unwrap() + } + + fn type_name() -> TypeName { + TypeName::new(stringify!(ChainRecord)) + } +} + +const VERSION_TABLE: TableDefinition<&str, u16> = TableDefinition::new("version"); +const CHAIN_TABLE: TableDefinition = TableDefinition::new("chain"); + +pub struct RedbBlockStore { + db: Database, +} + +impl RedbBlockStore { + const DB_VERSION: u16 = 1; + + pub fn new(db_path: &Path) -> Result { + let db = Builder::new() + .set_repair_callback(Self::repair_callback) + .create(db_path)?; + Self::migrate(&db)?; + Ok(Self { db }) + } + + pub fn repair_callback(session: &mut RepairSession) { + let progress = session.progress(); + info!("Redb Repair progress: {:?}", progress); + } + + fn migrate(db: &Database) -> Result<(), Error> { + let read_tx = db.begin_read()?; + let version_table = read_tx.open_table(VERSION_TABLE)?; + let current_version = match version_table.get("version")? { + Some(version) => version.value(), + None => 0, + }; + read_tx.close()?; + + if current_version < Self::DB_VERSION { + // Do migration + let write_tx = db.begin_write()?; + { + let mut version_table = write_tx.open_table(VERSION_TABLE)?; + info!("Migrating database from version 0 to 1"); + version_table.insert("version", Self::DB_VERSION)?; + } + write_tx.commit()?; + } + + Ok(()) + } + + fn redb_save_block( + &mut self, + pending_blocks: &mut Vec, + shelley_genesis_hash: &str, + ) -> Result<(), Error> { + let first_pending_block_number = pending_blocks.first().unwrap().block_number; + + let write_tx = self.db.begin_write()?; + { + // get the last block eta_v (nonce) in the db + let mut chain_table = write_tx.open_table(CHAIN_TABLE)?; + let mut chain_iter = chain_table.iter()?; + let mut prev_eta_v: Hash<32> = Hash::from(hex::decode(shelley_genesis_hash).unwrap().as_slice()); + let mut to_update: Vec<(u128, ChainRecord)> = Vec::new(); + while let Some(chain_record) = chain_iter.next_back() { + let (key, chain_record) = chain_record?; + let chain_record: ChainRecord = chain_record.value(); + if chain_record.block_number >= first_pending_block_number && !chain_record.orphaned { + // set it to orphaned + to_update.push(( + key.value(), + ChainRecord { + orphaned: true, + ..chain_record.clone() + }, + )); + continue; + } + if chain_record.orphaned { + continue; + } + prev_eta_v = Hash::from(chain_record.eta_v.as_slice()); + // sanity check + assert_eq!( + chain_record.block_number, + first_pending_block_number - 1, + "block_number: {}, first_pending_block_number: {}", + chain_record.block_number, + first_pending_block_number + ); + break; + } + for (key, chain_record) in to_update { + chain_table.insert(key, chain_record)?; + } + + // save the pending blocks + for block in pending_blocks { + let key = Uuid::now_v7().as_u128(); + + // blake2b 224 of node_vkey is the pool_id + let pool_id = Hasher::<224>::hash(block.node_vkey.as_slice()); + + // calculate rolling nonce (eta_v) + let mut rolling_nonce_generator = RollingNonceGenerator::new(prev_eta_v); + rolling_nonce_generator.apply_block(&block.eta_vrf_0)?; + let eta_v = rolling_nonce_generator.finalize()?; + + let chain_record = ChainRecord { + block_number: block.block_number, + slot_number: block.slot_number, + hash: block.hash.clone(), + prev_hash: block.prev_hash.clone(), + pool_id: pool_id.to_vec(), + eta_v: eta_v.to_vec(), + node_vkey: block.node_vkey.clone(), + node_vrf_vkey: block.node_vrf_vkey.clone(), + block_vrf_0: block.block_vrf_0.clone(), + block_vrf_1: block.block_vrf_1.clone(), + eta_vrf_0: block.eta_vrf_0.clone(), + eta_vrf_1: block.eta_vrf_1.clone(), + leader_vrf_0: block.leader_vrf_0.clone(), + leader_vrf_1: block.leader_vrf_1.clone(), + block_size: block.block_size, + block_body_hash: block.block_body_hash.clone(), + pool_opcert: block.pool_opcert.clone(), + unknown_0: block.unknown_0, + unknown_1: block.unknown_1, + unknown_2: block.unknown_2.clone(), + protocol_major_version: block.protocol_major_version, + protocol_minor_version: block.protocol_minor_version, + orphaned: false, + }; + chain_table.insert(key, chain_record)?; + + prev_eta_v = eta_v; + } + } + write_tx.commit()?; + + Ok(()) + } +} + +impl BlockStore for RedbBlockStore { + fn save_block( + &mut self, + pending_blocks: &mut Vec, + shelley_genesis_hash: &str, + ) -> Result<(), blockstore::Error> { + Ok(self.redb_save_block(pending_blocks, shelley_genesis_hash)?) + } + + fn load_blocks(&mut self) -> Option)>> { + let read_tx = self.db.begin_read().ok()?; + // get slot_number and hash from chain table ordering by slot_number descending where orphaned is false + // limit the result to 33 records + let chain_table = read_tx.open_table(CHAIN_TABLE).ok()?; + let mut chain_iter = chain_table.iter().ok()?; + let mut blocks: Vec<(i64, Vec)> = Vec::new(); + while let Some(record) = chain_iter.next_back() { + let (_, chain_record) = record.ok()?; + let chain_record: ChainRecord = chain_record.value(); + debug!("chain_record: {:?}", chain_record); + if chain_record.orphaned { + continue; + } + let slot_number = chain_record.slot_number; + let hash = chain_record.hash.clone(); + blocks.push((slot_number as i64, hash)); + if blocks.len() >= 33 { + break; + } + } + + read_tx.close().ok()?; + + Some(blocks) + } +} diff --git a/src/nodeclient/sqlite.rs b/src/nodeclient/blockstore/sqlite.rs similarity index 94% rename from src/nodeclient/sqlite.rs rename to src/nodeclient/blockstore/sqlite.rs index af0418d..381cf58 100644 --- a/src/nodeclient/sqlite.rs +++ b/src/nodeclient/blockstore/sqlite.rs @@ -1,16 +1,16 @@ -use std::io; use std::path::Path; +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::BlockStore; +use crate::nodeclient::sync::BlockHeader; use blake2b_simd::Params; use log::{debug, error, info}; use pallas_crypto::hash::Hash; -use pallas_crypto::nonce::NonceGenerator; use pallas_crypto::nonce::rolling_nonce::RollingNonceGenerator; -use rusqlite::{Connection, named_params}; +use pallas_crypto::nonce::NonceGenerator; +use rusqlite::{named_params, Connection}; use thiserror::Error; -use crate::nodeclient::sync::BlockHeader; - #[derive(Error, Debug)] pub enum Error { #[error("SQLite error: {0}")] @@ -20,11 +20,6 @@ pub enum Error { Nonce(#[from] pallas_crypto::nonce::Error), } -pub trait BlockStore { - fn save_block(&mut self, pending_blocks: &mut Vec, shelley_genesis_hash: &str) -> io::Result<()>; - fn load_blocks(&mut self) -> Option)>>; -} - pub struct SqLiteBlockStore { pub db: Connection, } @@ -202,7 +197,9 @@ impl SqLiteBlockStore { shelley_genesis_hash.to_string() } }, - ).unwrap().as_slice() + ) + .unwrap() + .as_slice(), ); let tx = db.transaction()?; @@ -277,13 +274,15 @@ impl SqLiteBlockStore { shelley_genesis_hash.to_string() } }, - ).unwrap().as_slice() + ) + .unwrap() + .as_slice(), ); } // calculate rolling nonce (eta_v) let mut rolling_nonce_generator = RollingNonceGenerator::new(prev_eta_v); rolling_nonce_generator.apply_block(&block.eta_vrf_0)?; - prev_eta_v = rolling_nonce_generator.finalize()?; + let eta_v = rolling_nonce_generator.finalize()?; // blake2b 224 of node_vkey is the pool_id let pool_id = Params::new() @@ -300,7 +299,7 @@ impl SqLiteBlockStore { ":hash" : hex::encode(block.hash), ":prev_hash" : hex::encode(block.prev_hash), ":pool_id" : hex::encode(pool_id), - ":eta_v" : hex::encode(prev_eta_v), + ":eta_v" : hex::encode(eta_v), ":node_vkey" : hex::encode(block.node_vkey), ":node_vrf_vkey" : hex::encode(block.node_vrf_vkey), ":block_vrf_0": hex::encode(block.block_vrf_0), @@ -318,6 +317,8 @@ impl SqLiteBlockStore { ":protocol_major_version" : block.protocol_major_version, ":protocol_minor_version" : block.protocol_minor_version, })?; + + prev_eta_v = eta_v; } } @@ -327,11 +328,12 @@ impl SqLiteBlockStore { } impl BlockStore for SqLiteBlockStore { - fn save_block(&mut self, pending_blocks: &mut Vec, shelley_genesis_hash: &str) -> io::Result<()> { - match self.sql_save_block(pending_blocks, shelley_genesis_hash) { - Ok(_) => Ok(()), - Err(error) => Err(io::Error::new(io::ErrorKind::Other, format!("Database error!: {:?}", error))), - } + fn save_block( + &mut self, + pending_blocks: &mut Vec, + shelley_genesis_hash: &str, + ) -> Result<(), blockstore::Error> { + Ok(self.sql_save_block(pending_blocks, shelley_genesis_hash)?) } fn load_blocks(&mut self) -> Option)>> { diff --git a/src/nodeclient/math.rs b/src/nodeclient/leaderlog/math.rs similarity index 100% rename from src/nodeclient/math.rs rename to src/nodeclient/leaderlog/math.rs diff --git a/src/nodeclient/leaderlog.rs b/src/nodeclient/leaderlog/mod.rs similarity index 99% rename from src/nodeclient/leaderlog.rs rename to src/nodeclient/leaderlog/mod.rs index 30fe5ba..ebb474d 100644 --- a/src/nodeclient/leaderlog.rs +++ b/src/nodeclient/leaderlog/mod.rs @@ -4,6 +4,11 @@ use std::io::{stdout, BufReader}; use std::path::Path; use std::str::FromStr; +use crate::nodeclient::leaderlog::deserialize::cbor_hex; +use crate::nodeclient::leaderlog::ledgerstate::calculate_ledger_state_sigma_d_and_extra_entropy; +use crate::nodeclient::leaderlog::libsodium::{sodium_crypto_vrf_proof_to_hash, sodium_crypto_vrf_prove}; +use crate::nodeclient::leaderlog::math::{ln, normalize, round, taylor_exp_cmp, TaylorCmp}; +use crate::{LedgerSet, PooltoolConfig}; use bigdecimal::{BigDecimal, FromPrimitive, One, ToPrimitive}; use blake2b_simd::Params; use byteorder::{ByteOrder, NetworkEndian}; @@ -19,15 +24,10 @@ use serde::{Deserialize, Serialize}; use serde_aux::prelude::deserialize_number_from_string; use thiserror::Error; -use crate::nodeclient::leaderlog::deserialize::cbor_hex; -use crate::nodeclient::leaderlog::ledgerstate::calculate_ledger_state_sigma_d_and_extra_entropy; -use crate::nodeclient::leaderlog::libsodium::{sodium_crypto_vrf_proof_to_hash, sodium_crypto_vrf_prove}; -use crate::nodeclient::math::{ln, normalize, round, taylor_exp_cmp, TaylorCmp}; -use crate::nodeclient::{LedgerSet, PooltoolConfig}; - mod deserialize; -pub mod ledgerstate; +mod ledgerstate; pub(crate) mod libsodium; +mod math; #[derive(Error, Debug)] pub enum Error { diff --git a/src/nodeclient/mod.rs b/src/nodeclient/mod.rs new file mode 100644 index 0000000..c02e238 --- /dev/null +++ b/src/nodeclient/mod.rs @@ -0,0 +1,7 @@ +pub(crate) mod blockstore; +pub(crate) mod leaderlog; +pub(crate) mod ping; +pub(crate) mod sign; +pub(crate) mod snapshot; +pub(crate) mod sync; +pub(crate) mod validate; diff --git a/src/nodeclient/ping.rs b/src/nodeclient/ping/mod.rs similarity index 100% rename from src/nodeclient/ping.rs rename to src/nodeclient/ping/mod.rs diff --git a/src/nodeclient/signing.rs b/src/nodeclient/sign/mod.rs similarity index 100% rename from src/nodeclient/signing.rs rename to src/nodeclient/sign/mod.rs diff --git a/src/nodeclient/snapshot.rs b/src/nodeclient/snapshot/mod.rs similarity index 91% rename from src/nodeclient/snapshot.rs rename to src/nodeclient/snapshot/mod.rs index a57e85b..3b8a70c 100644 --- a/src/nodeclient/snapshot.rs +++ b/src/nodeclient/snapshot/mod.rs @@ -22,23 +22,20 @@ pub enum Error { #[error("Unexpected array length: expected {expected}, got {actual}")] UnexpectedArrayLength { expected: u64, actual: u64 }, - #[error("Unexpected map length: expected {expected}, got {actual}")] - UnexpectedMapLength { expected: u64, actual: u64 }, - #[error("Unexpected Cbor Type: {value:?}")] UnexpectedCborType { value: Type }, #[error(transparent)] - Bech32Error(#[from] bech32::primitives::hrp::Error), + Bech32(#[from] bech32::primitives::hrp::Error), #[error(transparent)] - Bech32EncodingError(#[from] bech32::EncodeError), + Bech32Encoding(#[from] bech32::EncodeError), #[error(transparent)] - IoError(#[from] std::io::Error), + Io(#[from] std::io::Error), #[error("Snapshot error: {0}")] - SnapshotError(String), + Snapshot(String), } #[derive(Debug)] @@ -63,7 +60,7 @@ pub(crate) async fn dump( "mark" => Snapshot::Mark, "set" => Snapshot::Set, "go" => Snapshot::Go, - _ => return Err(Error::SnapshotError(format!("Unknown snapshot name: {}", name))), + _ => return Err(Error::Snapshot(format!("Unknown snapshot name: {}", name))), }; let client = client.statequery(); @@ -160,7 +157,7 @@ pub(crate) async fn dump( let stake_key_prefix = [match address_type { 0 => 0xe0u8, // key-based stake address 1 => 0xf0u8, // script-based stake address - _ => return Err(Error::SnapshotError(format!("Unknown address type: {}", address_type))), + _ => return Err(Error::Snapshot(format!("Unknown address type: {}", address_type))), } | network_id]; let stake_key_bytes = decoder.bytes()?; let stake_key_bytes = [&stake_key_prefix, stake_key_bytes].concat(); diff --git a/src/nodeclient/sync.rs b/src/nodeclient/sync/mod.rs similarity index 93% rename from src/nodeclient/sync.rs rename to src/nodeclient/sync/mod.rs index bfd99d3..b672dda 100644 --- a/src/nodeclient/sync.rs +++ b/src/nodeclient/sync/mod.rs @@ -4,8 +4,10 @@ use std::ops::Sub; use std::path::Path; use std::time::{Duration, Instant}; -use thiserror::Error; - +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::redb::RedbBlockStore; +use crate::nodeclient::blockstore::sqlite::SqLiteBlockStore; +use crate::nodeclient::blockstore::BlockStore; use log::{debug, error, info, warn}; use pallas_network::facades::{KeepAliveLoop, PeerClient, DEFAULT_KEEP_ALIVE_INTERVAL_SEC}; use pallas_network::miniprotocols::chainsync::{HeaderContent, NextResponse, Tip}; @@ -16,40 +18,34 @@ use pallas_network::miniprotocols::{ }; use pallas_network::multiplexer::{Bearer, Plexer}; use pallas_traverse::MultiEraHeader; +use thiserror::Error; -use crate::nodeclient::pooltool; -use crate::nodeclient::sqlite; -use crate::nodeclient::sqlite::BlockStore; - -use super::sqlite::SqLiteBlockStore; +pub(crate) mod pooltool; const FIVE_SECS: Duration = Duration::from_secs(5); #[derive(Error, Debug)] pub enum Error { - #[error("loggingobserver error occurred")] - LoggingObserverError(String), - #[error("pallas_traverse error occurred: {0}")] - PallasTraverseError(#[from] pallas_traverse::Error), + PallasTraverse(#[from] pallas_traverse::Error), #[error("io error occurred: {0}")] - IoError(#[from] std::io::Error), + Io(#[from] std::io::Error), #[error("keepalive error occurred: {0}")] - KeepAliveError(#[from] keepalive::ClientError), + KeepAlive(#[from] keepalive::ClientError), #[error("chainsync error occurred: {0}")] - ChainSyncError(#[from] chainsync::ClientError), + ChainSync(#[from] chainsync::ClientError), - #[error("chainsync canceled")] - ChainSyncCanceled, + #[error("blockstore error occurred: {0}")] + BlockStore(#[from] blockstore::Error), } #[derive(Debug, Clone)] -pub struct BlockHeader { - pub block_number: i64, - pub slot_number: i64, +pub(crate) struct BlockHeader { + pub block_number: u64, + pub slot_number: u64, pub hash: Vec, pub prev_hash: Vec, pub node_vkey: Vec, @@ -60,14 +56,14 @@ pub struct BlockHeader { pub eta_vrf_1: Vec, pub leader_vrf_0: Vec, pub leader_vrf_1: Vec, - pub block_size: i64, + pub block_size: u64, pub block_body_hash: Vec, pub pool_opcert: Vec, - pub unknown_0: i64, - pub unknown_1: i64, + pub unknown_0: u64, + pub unknown_1: u64, pub unknown_2: Vec, - pub protocol_major_version: i64, - pub protocol_minor_version: i64, + pub protocol_major_version: u64, + pub protocol_minor_version: u64, } struct LoggingObserver { @@ -123,8 +119,8 @@ impl Observer for LoggingObserver { MultiEraHeader::ShelleyCompatible(header) => { //sqlite only handles signed values so some casting is done here self.pending_blocks.push(BlockHeader { - block_number: header.header_body.block_number as i64, - slot_number: slot as i64, + block_number: header.header_body.block_number, + slot_number: slot, hash: hash.to_vec(), prev_hash: match header.header_body.prev_hash { None => vec![], @@ -138,14 +134,14 @@ impl Observer for LoggingObserver { eta_vrf_1: header.header_body.nonce_vrf.1.to_vec(), leader_vrf_0: leader_vrf_output, leader_vrf_1: header.header_body.leader_vrf.1.to_vec(), - block_size: header.header_body.block_body_size as i64, + block_size: header.header_body.block_body_size, block_body_hash: header.header_body.block_body_hash.to_vec(), pool_opcert: header.header_body.operational_cert_hot_vkey.to_vec(), - unknown_0: header.header_body.operational_cert_sequence_number as i64, - unknown_1: header.header_body.operational_cert_kes_period as i64, + unknown_0: header.header_body.operational_cert_sequence_number, + unknown_1: header.header_body.operational_cert_kes_period, unknown_2: header.header_body.operational_cert_sigma.to_vec(), - protocol_major_version: header.header_body.protocol_major as i64, - protocol_minor_version: header.header_body.protocol_minor as i64, + protocol_major_version: header.header_body.protocol_major, + protocol_minor_version: header.header_body.protocol_minor, }); let block_number: f64 = header.header_body.block_number as f64; let tip_block_number: f64 = tip.1 as f64; @@ -174,8 +170,8 @@ impl Observer for LoggingObserver { MultiEraHeader::BabbageCompatible(header) => { //sqlite only handles signed values so some casting is done here self.pending_blocks.push(BlockHeader { - block_number: header.header_body.block_number as i64, - slot_number: slot as i64, + block_number: header.header_body.block_number, + slot_number: slot, hash: hash.to_vec(), prev_hash: match header.header_body.prev_hash { None => vec![], @@ -189,15 +185,14 @@ impl Observer for LoggingObserver { eta_vrf_1: vec![], leader_vrf_0: leader_vrf_output, leader_vrf_1: vec![], - block_size: header.header_body.block_body_size as i64, + block_size: header.header_body.block_body_size, block_body_hash: header.header_body.block_body_hash.to_vec(), pool_opcert: header.header_body.operational_cert.operational_cert_hot_vkey.to_vec(), - unknown_0: header.header_body.operational_cert.operational_cert_sequence_number - as i64, - unknown_1: header.header_body.operational_cert.operational_cert_kes_period as i64, + unknown_0: header.header_body.operational_cert.operational_cert_sequence_number, + unknown_1: header.header_body.operational_cert.operational_cert_kes_period, unknown_2: header.header_body.operational_cert.operational_cert_sigma.to_vec(), - protocol_major_version: header.header_body.protocol_version.0 as i64, - protocol_minor_version: header.header_body.protocol_version.1 as i64, + protocol_major_version: header.header_body.protocol_version.0, + protocol_minor_version: header.header_body.protocol_version.1, }); let block_number: f64 = header.header_body.block_number as f64; let tip_block_number = max(header.header_body.block_number, tip.1); @@ -257,7 +252,7 @@ impl Observer for LoggingObserver { } } -fn get_intersect_blocks(block_store: &mut SqLiteBlockStore) -> Result, Error> { +fn get_intersect_blocks(block_store: &mut Box) -> Result, Error> { let start = Instant::now(); debug!("get_intersect_blocks"); @@ -363,10 +358,15 @@ pub(crate) async fn sync( network_magic: u64, shelley_genesis_hash: &str, no_service: bool, + use_redb: bool, ) { loop { // Retry to establish connection forever - let mut block_store = sqlite::SqLiteBlockStore::new(db).unwrap(); + let mut block_store: Box = if use_redb { + Box::new(RedbBlockStore::new(db).unwrap()) + } else { + Box::new(SqLiteBlockStore::new(db).unwrap()) + }; let chain_blocks = get_intersect_blocks(&mut block_store).unwrap(); match Bearer::connect_tcp_timeout( &format!("{host}:{port}").to_socket_addrs().unwrap().next().unwrap(), @@ -421,7 +421,7 @@ pub(crate) async fn sync( false, no_service, Some(chain_blocks), - Some(Box::new(block_store)), + Some(block_store), shelley_genesis_hash, ) .await diff --git a/src/nodeclient/pooltool.rs b/src/nodeclient/sync/pooltool.rs similarity index 89% rename from src/nodeclient/pooltool.rs rename to src/nodeclient/sync/pooltool.rs index e63bfde..c169407 100644 --- a/src/nodeclient/pooltool.rs +++ b/src/nodeclient/sync/pooltool.rs @@ -1,16 +1,38 @@ +use std::fs::File; +use std::io::BufReader; use std::ops::Sub; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::time::{Duration, Instant}; use chrono::{SecondsFormat, Utc}; use log::{error, info}; use regex::Regex; -use serde::Serialize; +use serde::{Deserialize, Serialize}; -use crate::nodeclient::sqlite::BlockStore; +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::BlockStore; use crate::nodeclient::sync::BlockHeader; -use crate::nodeclient::APP_USER_AGENT; +use crate::APP_USER_AGENT; + +pub(crate) fn get_pooltool_config(config: &Path) -> PooltoolConfig { + let buf = BufReader::new(File::open(config).unwrap()); + serde_json::from_reader(buf).unwrap() +} + +#[derive(Debug, Deserialize)] +pub(crate) struct PooltoolConfig { + pub(crate) api_key: String, + pub(crate) pools: Vec, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct Pool { + pub(crate) name: String, + pub(crate) pool_id: String, + pub(crate) host: String, + pub(crate) port: u16, +} #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] @@ -26,15 +48,15 @@ struct PooltoolData0 { node_id: String, version: String, at: String, - block_no: i64, - slot_no: i64, + block_no: u64, + slot_no: u64, block_hash: String, parent_hash: String, leader_vrf: String, leader_vrf_proof: String, node_v_key: String, - protocol_major_version: i64, - protocol_minor_version: i64, + protocol_major_version: u64, + protocol_minor_version: u64, platform: String, } @@ -52,16 +74,16 @@ struct PooltoolData1 { node_id: String, version: String, at: String, - block_no: i64, - slot_no: i64, + block_no: u64, + slot_no: u64, block_hash: String, parent_hash: String, leader_vrf: String, block_vrf: String, block_vrf_proof: String, node_v_key: String, - protocol_major_version: i64, - protocol_minor_version: i64, + protocol_major_version: u64, + protocol_minor_version: u64, platform: String, } @@ -207,7 +229,7 @@ impl BlockStore for PoolToolNotifier { &mut self, pending_blocks: &mut Vec, _shelley_genesis_hash: &str, - ) -> std::io::Result<()> { + ) -> Result<(), blockstore::Error> { self.send_to_pooltool(pending_blocks.last().unwrap()); Ok(()) } diff --git a/src/nodeclient/validate.rs b/src/nodeclient/validate/mod.rs similarity index 98% rename from src/nodeclient/validate.rs rename to src/nodeclient/validate/mod.rs index f8cdb24..5046406 100644 --- a/src/nodeclient/validate.rs +++ b/src/nodeclient/validate/mod.rs @@ -51,6 +51,8 @@ fn query_block(db_path: &Path, like: String) -> Result { return Err(Error::InvalidPath(db_path.to_path_buf())); } + //FIXME: Use either blockstore type + let db = Connection::open(db_path)?; let query_result = db.query_row( "SELECT block_number,slot_number,hash,prev_hash,pool_id,leader_vrf_0,orphaned FROM chain WHERE hash LIKE ? ORDER BY orphaned ASC", diff --git a/src/test.rs b/src/test.rs index 83a7f74..3c48e79 100644 --- a/src/test.rs +++ b/src/test.rs @@ -5,7 +5,7 @@ use chrono::{NaiveDateTime, Utc}; use num_rational::BigRational; use regex::Regex; -use cncli::nodeclient::math::{ceiling, exp, find_e, ln, round, split_ln}; +use cncli::nodeclient::leaderlog::math::{ceiling, exp, find_e, ln, round, split_ln}; use cncli::nodeclient::ping; use nodeclient::leaderlog::is_overlay_slot; use nodeclient::math::ipow;