diff --git a/Cargo.lock b/Cargo.lock index b45c6a4..7e18b22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -316,15 +316,15 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" [[package]] name = "cc" -version = "1.1.19" +version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d74707dde2ba56f86ae90effb3b43ddd369504387e718014de010cec7959800" +checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" dependencies = [ "jobserver", "libc", @@ -401,6 +401,7 @@ dependencies = [ "futures", "hex", "itertools 0.13.0", + "log", "malachite", "malachite-base", "minicbor 0.25.1", @@ -950,9 +951,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.60" +version = "0.1.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -1255,9 +1256,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ea5043e58958ee56f3e15a90aee535795cd7dfd319846288d93c5b57d85cbe" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "opaque-debug" @@ -2254,9 +2255,9 @@ checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" [[package]] name = "unicode-normalization" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" dependencies = [ "tinyvec", ] @@ -2269,9 +2270,9 @@ checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" [[package]] name = "unicode-width" -version = "0.1.13" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" [[package]] name = "untrusted" @@ -2441,9 +2442,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.26.5" +version = "0.26.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bd24728e5af82c6c4ec1b66ac4844bdf8156257fccda846ec58b42cd0cdbe6a" +checksum = "841c67bff177718f1d4dfefde8d8f0e78f9b6589319ba88312f567fc5841a958" dependencies = [ "rustls-pki-types", ] diff --git a/Cargo.toml b/Cargo.toml index 087e3aa..0951289 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ thiserror = "1.0" tracing = "0.1" tracing-subscriber = "0.3" uuid = { version = "1", features = ["v7"] } +log = "0.4.22" [build-dependencies] built = { version = "0.7", features = ["git2"] } diff --git a/build.rs b/build.rs index bf1d961..c202570 100644 --- a/build.rs +++ b/build.rs @@ -1,7 +1,3 @@ -use std::env; -use std::path::Path; -use std::process::Command; - fn main() { built::write_built_file().expect("Failed to acquire build-time information"); diff --git a/src/nodeclient/blockstore/mod.rs b/src/nodeclient/blockstore/mod.rs index 8814cc9..dcb7ea7 100644 --- a/src/nodeclient/blockstore/mod.rs +++ b/src/nodeclient/blockstore/mod.rs @@ -38,4 +38,11 @@ pub(crate) trait BlockStore { fn get_tip_slot_number(&mut self) -> Result; fn get_eta_v_before_slot(&mut self, slot_number: u64) -> Result, Error>; fn get_prev_hash_before_slot(&mut self, slot_number: u64) -> Result, Error>; + fn save_slots(&mut self, epoch: u64, pool_id: &str, slot_qty: u64, slots: &str, hash: &str) -> Result<(), Error>; + + /// Get the number of slots and the hash from the block store for the epoch and pool_id + fn get_current_slots(&mut self, epoch: u64, pool_id: &str) -> Result<(u64, String), Error>; + + /// Get the previous slots list raw data String from the block store for the epoch and pool_id + fn get_previous_slots(&mut self, epoch: u64, pool_id: &str) -> Result, Error>; } diff --git a/src/nodeclient/blockstore/redb.rs b/src/nodeclient/blockstore/redb.rs index 06a50a8..10653ff 100644 --- a/src/nodeclient/blockstore/redb.rs +++ b/src/nodeclient/blockstore/redb.rs @@ -1,17 +1,16 @@ -use std::io::Read; -use std::path::Path; +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::{Block, BlockStore}; +use crate::nodeclient::sync::BlockHeader; use pallas_crypto::hash::{Hash, Hasher}; use pallas_crypto::nonce::generate_rolling_nonce; use redb::{Builder, Database, MultimapTableDefinition, MultimapValue, ReadableMultimapTable, ReadableTable, RepairSession, TableDefinition, TypeName, Value}; use serde::{Deserialize, Serialize}; +use std::io::Read; +use std::path::Path; use thiserror::Error; -use tracing::info; +use tracing::{debug, info}; use uuid::Uuid; -use crate::nodeclient::blockstore; -use crate::nodeclient::blockstore::{Block, BlockStore}; -use crate::nodeclient::sync::BlockHeader; - #[derive(Error, Debug)] pub enum Error { #[error("IO error: {0}")] @@ -37,7 +36,7 @@ pub enum Error { #[error("FromHex error: {0}")] FromHex(#[from] hex::FromHexError), - + #[error("Data not found")] DataNotFound, } @@ -100,6 +99,46 @@ impl Value for ChainRecord { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SlotsRecord { + epoch: u64, + pool_id: Vec, + slot_qty: u64, + slots: String, + hash: Vec, +} + +impl Value for SlotsRecord { + type SelfType<'a> = Self; + type AsBytes<'a> = Vec + where + Self: 'a; + + fn fixed_width() -> Option { + // dynamic sized object. not fixed width + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + bincode::deserialize(data).unwrap() + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + bincode::serialize(value).unwrap() + } + + fn type_name() -> TypeName { + TypeName::new(stringify!(SlotsRecord)) + } +} + // magic number must be set to the ASCII letters 'redb' followed by 0x1A, 0x0A, 0xA9, 0x0D, 0x0A. // This sequence is inspired by the PNG magic number. const MAGIC_NUMBER: &[u8; 9] = b"redb\x1A\x0A\xA9\x0D\x0A"; @@ -108,10 +147,12 @@ const VERSION_TABLE: TableDefinition<&str, u16> = TableDefinition::new("version" const CHAIN_TABLE: TableDefinition = TableDefinition::new("chain"); const CHAIN_TABLE_SLOT_INDEX: MultimapTableDefinition = MultimapTableDefinition::new("chain_slot_index"); const CHAIN_TABLE_HASH_INDEX: MultimapTableDefinition<&[u8], u128> = MultimapTableDefinition::new("chain_hash_index"); +const SLOTS_TABLE: TableDefinition = TableDefinition::new("slots"); +const SLOTS_TABLE_POOL_ID_EPOCH_INDEX: TableDefinition<&[u8], u128> = TableDefinition::new("slots_pool_id_epoch_index"); pub(crate) fn is_redb_database(db_path: &Path) -> Result { - let mut file = std::fs::File::open(db_path).unwrap(); + let mut file = std::fs::File::open(db_path)?; let mut magic_number = [0u8; 9]; file.read_exact(&mut magic_number)?; Ok(&magic_number == MAGIC_NUMBER) @@ -158,6 +199,9 @@ impl RedbBlockStore { write_tx.open_table(CHAIN_TABLE)?; write_tx.open_multimap_table(CHAIN_TABLE_SLOT_INDEX)?; write_tx.open_multimap_table(CHAIN_TABLE_HASH_INDEX)?; + // create the slots table if it doesn't exist + write_tx.open_table(SLOTS_TABLE)?; + write_tx.open_table(SLOTS_TABLE_POOL_ID_EPOCH_INDEX)?; } write_tx.commit()?; } @@ -179,7 +223,7 @@ impl RedbBlockStore { let mut chain_table_slot_index = write_tx.open_multimap_table(CHAIN_TABLE_SLOT_INDEX)?; let mut chain_table_hash_index = write_tx.open_multimap_table(CHAIN_TABLE_HASH_INDEX)?; let mut chain_iter = chain_table.iter()?; - let mut prev_eta_v: Hash<32> = Hash::from(hex::decode(shelley_genesis_hash).unwrap().as_slice()); + let mut prev_eta_v: Hash<32> = shelley_genesis_hash.parse()?; let mut to_update: Vec<(u128, ChainRecord)> = Vec::new(); while let Some(chain_record) = chain_iter.next_back() { @@ -313,20 +357,26 @@ impl RedbBlockStore { let read_tx = self.db.begin_read()?; let chain_table_slot_index = read_tx.open_multimap_table(CHAIN_TABLE_SLOT_INDEX)?; let mut iter = chain_table_slot_index.iter()?; - while let Some(result) = iter.next_back() { + if let Some(result) = iter.next_back() { let (slot_number, _) = result?; return Ok(slot_number.value()); } Ok(0) } - + fn redb_get_eta_v_before_slot(&mut self, slot_number: u64) -> Result, Error> { let read_tx = self.db.begin_read()?; let chain_table_slot_index = read_tx.open_multimap_table(CHAIN_TABLE_SLOT_INDEX)?; let chain_table = read_tx.open_table(CHAIN_TABLE)?; - for slot_number in (0..(slot_number-1)).rev() { + for slot_number in (0..slot_number).rev() { let mut chain_keys: MultimapValue = match chain_table_slot_index.get(slot_number) { - Ok(keys) => keys, + Ok(keys) => { + if !keys.is_empty() { + keys + } else { + continue; + } + } Err(_) => continue, }; let eta_v: Option> = chain_keys.find_map(|key| { @@ -342,24 +392,30 @@ impl RedbBlockStore { return Ok(eta_v); } } - + Err(Error::DataNotFound) } - + fn redb_get_prev_hash_before_slot(&mut self, slot_number: u64) -> Result, Error> { let read_tx = self.db.begin_read()?; let chain_table_slot_index = read_tx.open_multimap_table(CHAIN_TABLE_SLOT_INDEX)?; let chain_table = read_tx.open_table(CHAIN_TABLE)?; - for slot_number in (0..(slot_number-1)).rev() { + for slot_number in (0..slot_number).rev() { let mut chain_keys: MultimapValue = match chain_table_slot_index.get(slot_number) { - Ok(keys) => keys, + Ok(keys) => { + if !keys.is_empty() { + keys + } else { + continue; + } + } Err(_) => continue, }; let prev_hash: Option> = chain_keys.find_map(|key| { - let key = key.ok()?; - let chain_record: ChainRecord = chain_table.get(key.value()).ok()??.value(); + let key = key.ok()?.value(); + let chain_record: ChainRecord = chain_table.get(key).ok()??.value(); if !chain_record.orphaned { - Some(Hash::<32>::from(chain_record.hash.as_slice())) + Some(Hash::<32>::from(chain_record.prev_hash.as_slice())) } else { None } @@ -368,9 +424,88 @@ impl RedbBlockStore { return Ok(prev_hash); } } - + Err(Error::DataNotFound) } + + fn redb_save_slots(&mut self, epoch: u64, pool_id: &str, slot_qty: u64, slots: &str, hash: &str) -> Result<(), Error> { + // See if record exists already + let mut hasher = Hasher::<224>::new(); + hasher.input(&epoch.to_be_bytes()); + hasher.input(hex::decode(pool_id)?.as_slice()); + let index_key = hasher.finalize(); + + let read_tx = self.db.begin_read()?; + let slots_key = { + let slots_table_pool_id_epoch_index = read_tx.open_table(SLOTS_TABLE_POOL_ID_EPOCH_INDEX)?; + slots_table_pool_id_epoch_index.get(index_key.as_slice())?.map(|key| key.value()) + }; + + let write_tx = self.db.begin_write()?; + { + let mut slots_table = write_tx.open_table(SLOTS_TABLE)?; + match slots_key { + Some(key) => { + // Update existing record + let mut slots_record: SlotsRecord = slots_table.get(key)?.map(|record| record.value()).ok_or(Error::DataNotFound)?; + slots_record.slot_qty = slot_qty; + slots_record.slots = slots.to_string(); + slots_record.hash = hex::decode(hash)?; + slots_table.insert(key, slots_record)?; + } + None => { + // Add new record and index + let mut slots_table_pool_id_epoch_index = write_tx.open_table(SLOTS_TABLE_POOL_ID_EPOCH_INDEX)?; + let key = Uuid::now_v7().as_u128(); + let slots_record = SlotsRecord { + epoch, + pool_id: hex::decode(pool_id)?, + slot_qty, + slots: slots.to_string(), + hash: hex::decode(hash)?, + }; + slots_table.insert(key, slots_record)?; + slots_table_pool_id_epoch_index.insert(index_key.as_slice(), key)?; + } + } + } + write_tx.commit()?; + + Ok(()) + } + + fn redb_get_current_slots(&mut self, epoch: u64, pool_id: &str) -> Result<(u64, String), Error> { + let mut hasher = Hasher::<224>::new(); + hasher.input(&epoch.to_be_bytes()); + hasher.input(hex::decode(pool_id)?.as_slice()); + let index_key = hasher.finalize(); + + let read_tx = self.db.begin_read()?; + let slots_table_pool_id_epoch_index = read_tx.open_table(SLOTS_TABLE_POOL_ID_EPOCH_INDEX)?; + let slots_key = slots_table_pool_id_epoch_index.get(index_key.as_slice())?.map(|key| key.value()).ok_or(Error::DataNotFound)?; + + let slots_table = read_tx.open_table(SLOTS_TABLE)?; + let slots_record = slots_table.get(slots_key)?.map(|record| record.value()).ok_or(Error::DataNotFound)?; + + Ok((slots_record.slot_qty, hex::encode(slots_record.hash))) + } + + fn redb_get_previous_slots(&mut self, epoch: u64, pool_id: &str) -> Result, Error> { + let mut hasher = Hasher::<224>::new(); + hasher.input(&epoch.to_be_bytes()); + hasher.input(hex::decode(pool_id)?.as_slice()); + let index_key = hasher.finalize(); + + let read_tx = self.db.begin_read()?; + let slots_table_pool_id_epoch_index = read_tx.open_table(SLOTS_TABLE_POOL_ID_EPOCH_INDEX)?; + if let Some(slots_key) = slots_table_pool_id_epoch_index.get(index_key.as_slice())?.map(|key| key.value()) { + let slots_table = read_tx.open_table(SLOTS_TABLE)?; + let slots_record = slots_table.get(slots_key)?.map(|record| record.value()).ok_or(Error::DataNotFound)?; + Ok(Some(slots_record.slots)) + } else { + Ok(None) + } + } } impl BlockStore for RedbBlockStore { @@ -401,4 +536,16 @@ impl BlockStore for RedbBlockStore { fn get_prev_hash_before_slot(&mut self, slot_number: u64) -> Result, blockstore::Error> { Ok(self.redb_get_prev_hash_before_slot(slot_number)?) } + + fn save_slots(&mut self, epoch: u64, pool_id: &str, slot_qty: u64, slots: &str, hash: &str) -> Result<(), blockstore::Error> { + Ok(self.redb_save_slots(epoch, pool_id, slot_qty, slots, hash)?) + } + + fn get_current_slots(&mut self, epoch: u64, pool_id: &str) -> Result<(u64, String), blockstore::Error> { + Ok(self.redb_get_current_slots(epoch, pool_id)?) + } + + fn get_previous_slots(&mut self, epoch: u64, pool_id: &str) -> Result, blockstore::Error> { + Ok(self.redb_get_previous_slots(epoch, pool_id)?) + } } diff --git a/src/nodeclient/blockstore/sqlite.rs b/src/nodeclient/blockstore/sqlite.rs index 6a30a23..d3f0404 100644 --- a/src/nodeclient/blockstore/sqlite.rs +++ b/src/nodeclient/blockstore/sqlite.rs @@ -1,13 +1,13 @@ +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::{Block, BlockStore}; +use crate::nodeclient::sync::BlockHeader; use pallas_crypto::hash::{Hash, Hasher}; use pallas_crypto::nonce::generate_rolling_nonce; -use rusqlite::{named_params, Connection}; +use rusqlite::{named_params, Connection, OptionalExtension}; use std::path::Path; use std::str::FromStr; use thiserror::Error; use tracing::{debug, error, info}; -use crate::nodeclient::blockstore; -use crate::nodeclient::blockstore::{Block, BlockStore}; -use crate::nodeclient::sync::BlockHeader; #[derive(Error, Debug)] pub enum Error { @@ -276,7 +276,7 @@ impl SqLiteBlockStore { let eta_v = generate_rolling_nonce(prev_eta_v, &block.eta_vrf_0); // blake2b 224 of node_vkey is the pool_id - let pool_id = hex::encode(Hasher::<224>::hash(&block.node_vkey)); + let pool_id = hex::encode(Hasher::<224>::hash(&block.node_vkey)); insert_stmt.execute(named_params! { ":block_number" : block.block_number, @@ -362,7 +362,7 @@ impl SqLiteBlockStore { let eta_v: Hash<32> = Hash::from_str(&eta_v_hex)?; Ok(eta_v) } - + fn sql_get_prev_hash_before_slot(&mut self, slot_number: u64) -> Result, Error> { let db = &self.db; let prev_hash_hex: String = db.query_row( @@ -373,6 +373,48 @@ impl SqLiteBlockStore { let prev_hash: Hash<32> = Hash::from_str(&prev_hash_hex)?; Ok(prev_hash) } + + fn sql_save_slots(&mut self, epoch: u64, pool_id: &str, slot_qty: u64, slots: &str, hash: &str) -> Result<(), Error> { + let db = &mut self.db; + let tx = db.transaction()?; + { + let mut stmt = tx.prepare("INSERT INTO slots (epoch, pool_id, slot_qty, slots, hash) VALUES (:epoch, :pool_id, :slot_qty, :slots, :hash) ON CONFLICT (epoch,pool_id) DO UPDATE SET slot_qty=excluded.slot_qty, slots=excluded.slots, hash=excluded.hash")?; + stmt.execute(named_params! { + ":epoch" : epoch, + ":pool_id" : pool_id, + ":slot_qty" : slot_qty, + ":slots" : slots, + ":hash" : hash, + })?; + } + tx.commit()?; + Ok(()) + } + + fn sql_get_current_slots(&mut self, epoch: u64, pool_id: &str) -> Result<(u64, String), Error> { + let db = &self.db; + let mut stmt = db.prepare("SELECT slot_qty, hash FROM slots WHERE epoch = :epoch AND pool_id = :pool_id")?; + Ok(stmt.query_row(named_params! { + ":epoch" : epoch, + ":pool_id" : pool_id, + }, |row| { + let slot_qty: u64 = row.get(0)?; + let hash: String = row.get(1)?; + Ok((slot_qty, hash)) + })?) + } + + fn sql_get_previous_slots(&mut self, epoch: u64, pool_id: &str) -> Result, Error> { + let db = &self.db; + let mut stmt = db.prepare("SELECT slots FROM slots WHERE epoch = :epoch AND pool_id = :pool_id")?; + Ok(stmt.query_row(named_params! { + ":epoch" : epoch, + ":pool_id" : pool_id, + }, |row| { + let slots: String = row.get(0)?; + Ok(slots) + }).optional()?) + } } impl BlockStore for SqLiteBlockStore { @@ -403,4 +445,16 @@ impl BlockStore for SqLiteBlockStore { fn get_prev_hash_before_slot(&mut self, slot_number: u64) -> Result, blockstore::Error> { Ok(self.sql_get_prev_hash_before_slot(slot_number)?) } + + fn save_slots(&mut self, epoch: u64, pool_id: &str, slot_qty: u64, slots: &str, hash: &str) -> Result<(), blockstore::Error> { + Ok(self.sql_save_slots(epoch, pool_id, slot_qty, slots, hash)?) + } + + fn get_current_slots(&mut self, epoch: u64, pool_id: &str) -> Result<(u64, String), blockstore::Error> { + Ok(self.sql_get_current_slots(epoch, pool_id)?) + } + + fn get_previous_slots(&mut self, epoch: u64, pool_id: &str) -> Result, blockstore::Error> { + Ok(self.sql_get_previous_slots(epoch, pool_id)?) + } } diff --git a/src/nodeclient/leaderlog/mod.rs b/src/nodeclient/leaderlog/mod.rs index 7befd12..169d454 100644 --- a/src/nodeclient/leaderlog/mod.rs +++ b/src/nodeclient/leaderlog/mod.rs @@ -19,11 +19,10 @@ use pallas_crypto::nonce::generate_epoch_nonce; use pallas_crypto::vrf::{VrfSecretKey, VRF_SECRET_KEY_SIZE}; use pallas_math::math::{ExpOrdering, FixedDecimal, FixedPrecision, DEFAULT_PRECISION}; use rayon::prelude::*; -// use rusqlite::{named_params, Connection, OptionalExtension}; use serde::{Deserialize, Serialize}; use serde_aux::prelude::deserialize_number_from_string; use thiserror::Error; -use tracing::{debug, span, trace, warn, Level}; +use tracing::{debug, error, info, span, trace, Level}; mod deserialize; mod ledgerstate; @@ -140,8 +139,8 @@ struct Slot { struct PooltoolSendSlots { api_key: String, pool_id: String, - epoch: i64, - slot_qty: i64, + epoch: u64, + slot_qty: u64, hash: String, #[serde(skip_serializing_if = "Option::is_none")] override_time: Option, @@ -163,29 +162,6 @@ pub(crate) fn read_vrf_key(vrf_key_path: &Path) -> Result { let buf = BufReader::new(File::open(vrf_key_path)?); Ok(serde_json::from_reader(buf)?) } -// -// fn get_current_slots(db: &Connection, epoch: i64, pool_id: &str) -> Result<(i64, String), rusqlite::Error> { -// db.query_row( -// "SELECT slot_qty, hash FROM slots WHERE epoch = :epoch AND pool_id = :pool_id LIMIT 1", -// named_params! { -// ":epoch": epoch, -// ":pool_id": pool_id, -// }, -// |row| Ok((row.get(0)?, row.get(1)?)), -// ) -// } -// -// fn get_prev_slots(db: &Connection, epoch: i64, pool_id: &str) -> Result, rusqlite::Error> { -// db.query_row( -// "SELECT slots FROM slots WHERE epoch = :epoch AND pool_id = :pool_id LIMIT 1", -// named_params! { -// ":epoch": epoch, -// ":pool_id": pool_id, -// }, -// |row| row.get(0), -// ) -// .optional() -// } fn guess_shelley_transition_epoch(network_magic: u32) -> u64 { match network_magic { @@ -343,23 +319,24 @@ fn is_slot_leader_praos( cert_nat_max: &FixedDecimal, c: &FixedDecimal, ) -> Result { - let span = span!(Level::TRACE, "is_slot_leader_praos"); - let _enter = span.enter(); - debug!("is_slot_leader_praos: {}", slot); let seed: Vec = mk_input_vrf(slot, eta0); - debug!("seed: {}", hex::encode(&seed)); let cert_nat: Hash<64> = vrf_eval_certified(&seed, pool_vrf_skey)?; - debug!("cert_nat: {}", &cert_nat); let cert_leader_vrf: FixedDecimal = vrf_leader_value(cert_nat.as_slice())?; - debug!("cert_leader_vrf: {}", &cert_leader_vrf); let denominator = cert_nat_max - &cert_leader_vrf; let recip_q: FixedDecimal = cert_nat_max / &denominator; - debug!("recip_q: {}", &recip_q); - debug!("c: {}", c); let x: FixedDecimal = -(sigma * c); - debug!("x: {}", &x); - let ordering = x.exp_cmp(1000, 3, &recip_q); + + let span = span!(Level::TRACE, "is_slot_leader_praos"); + let _enter = span.enter(); + trace!("is_slot_leader_praos: {}", slot); + trace!("seed: {}", hex::encode(&seed)); + trace!("cert_nat: {}", &cert_nat); + trace!("cert_leader_vrf: {}", &cert_leader_vrf); + trace!("recip_q: {}", &recip_q); + trace!("c: {}", c); + trace!("x: {}", &x); + Ok(ordering.estimation == ExpOrdering::LT) } @@ -378,21 +355,22 @@ fn is_slot_leader_tpraos( cert_nat_max: &FixedDecimal, c: &FixedDecimal, ) -> Result { + let seed: Vec = mk_seed(slot, eta0); + let cert_nat: FixedDecimal = FixedDecimal::from(vrf_eval_certified(&seed, pool_vrf_skey)?.as_slice()); + let denominator = cert_nat_max - &cert_nat; + let recip_q: FixedDecimal = cert_nat_max / &denominator; + let x: FixedDecimal = -(sigma * c); + let ordering = x.exp_cmp(1000, 3, &recip_q); + let span = span!(Level::TRACE, "is_slot_leader_tpraos"); let _enter = span.enter(); trace!("is_slot_leader: {}", slot); - let seed: Vec = mk_seed(slot, eta0); trace!("seed: {}", hex::encode(&seed)); - let cert_nat: FixedDecimal = FixedDecimal::from(vrf_eval_certified(&seed, pool_vrf_skey)?.as_slice()); trace!("cert_nat: {}", &cert_nat); - let denominator = cert_nat_max - &cert_nat; - let recip_q: FixedDecimal = cert_nat_max / &denominator; trace!("recip_q: {}", &recip_q); trace!("c: {}", c); - let x: FixedDecimal = -(sigma * c); trace!("x: {}", &x); - let ordering = x.exp_cmp(1000, 3, &recip_q); Ok(ordering.estimation == ExpOrdering::LT) } @@ -664,7 +642,7 @@ pub(crate) fn calculate_leader_logs( // Calculate all of our assigned slots in the epoch (in parallel) let assigned_slots = (0..shelley.epoch_length) - // .par_bridge() // <--- use rayon parallel bridge + .par_bridge() // <--- use rayon parallel bridge .map(|slot_in_epoch| first_slot_of_epoch + slot_in_epoch) .filter(|epoch_slot| !is_overlay_slot(&first_slot_of_epoch, epoch_slot, &ledger_info.decentralization)) .filter_map(|leader_slot| match consensus { @@ -723,22 +701,10 @@ pub(crate) fn calculate_leader_logs( let hash = Hasher::<256>::hash(slots.as_bytes()).to_string(); - // FIXME - // db.execute("INSERT INTO slots (epoch,pool_id,slot_qty,slots,hash) VALUES (:epoch,:pool_id,:slot_qty,:slots,:hash) ON CONFLICT (epoch,pool_id) DO UPDATE SET slot_qty=excluded.slot_qty, slots=excluded.slots, hash=excluded.hash", - // named_params! { - // ":epoch" : epoch, - // ":pool_id" : pool_id, - // ":slot_qty" : assigned_slots.len() as i64, - // ":slots" : slots, - // ":hash" : hash - // }, - // )?; + block_store.save_slots(epoch, pool_id, assigned_slots.len() as u64, slots.as_str(), &hash)?; println!("{}", serde_json::to_string_pretty(&leader_log)?); - // FIXME - // db.close().unwrap(); - Ok(()) } @@ -801,100 +767,96 @@ pub(crate) fn send_slots( // check if db_path is a redb database based on magic number let use_redb = is_redb_database(db_path).expect("infallible"); - let mut _block_store: Box = if use_redb { + let mut block_store: Box = if use_redb { Box::new(RedbBlockStore::new(db_path).expect("infallible")) } else { Box::new(SqLiteBlockStore::new(db_path).expect("infallible")) }; - // match read_byron_genesis(byron_genesis) { - // Ok(byron) => { - // debug!("{:?}", byron); - // match read_shelley_genesis(shelley_genesis) { - // Ok(shelley) => { - // debug!("{:?}", shelley); - // match block_store.get_tip_slot_number() { - // Ok(tip_slot_number) => { - // debug!("tip_slot_number: {}", tip_slot_number); - // let tip_time = - // slot_to_naivedatetime(&byron, &shelley, tip_slot_number, shelley_trans_epoch.expect("infallible")) - // .and_utc() - // .timestamp(); - // let system_time = Utc::now().timestamp(); - // if system_time - tip_time < 120 { - // let (epoch, _) = - // get_first_slot_of_epoch(&byron, &shelley, tip_slot_number, shelley_trans_epoch.expect("infallible")); - // debug!("epoch: {}", epoch); - // for pool in pooltool_config.pools.iter() { - // match block_store.get_current_slots(epoch, &pool.pool_id) { - // Ok((slot_qty, hash)) => { - // debug!("slot_qty: {}", slot_qty); - // debug!("hash: {}", &hash); - // match get_prev_slots(&db, epoch - 1, &pool.pool_id) { - // Ok(prev_slots) => { - // let request = serde_json::ser::to_string(&PooltoolSendSlots { - // api_key: pooltool_config.api_key.clone(), - // pool_id: pool.pool_id.clone(), - // epoch, - // slot_qty, - // hash, - // override_time: override_time.clone(), - // prev_slots, - // }) - // .unwrap(); - // info!("Sending: {}", &request); - // match reqwest::blocking::Client::builder().build() { - // Ok(client) => { - // let pooltool_result = client - // .post("https://api.pooltool.io/v0/sendslots") - // .body(request) - // .send(); - // - // match pooltool_result { - // Ok(response) => match response.text() { - // Ok(text) => { - // info!("Pooltool Response: {}", text); - // } - // Err(error) => { - // error!("PoolTool error: {}", error); - // } - // }, - // Err(error) => { - // error!("PoolTool error: {}", error); - // } - // } - // } - // Err(err) => { - // error!("Could not set up the reqwest client!: {}", err) - // } - // } - // } - // Err(error) => { - // error!("Db Error: {}", error) - // } - // } - // } - // Err(error) => { - // error!("Cannot find db record for {},{}: {}", epoch, &pool.pool_id, error) - // } - // } - // } - // } else { - // handle_error("db not fully synced!") - // } - // } - // Err(error) => handle_error(error), - // } - // } - // Err(error) => handle_error(error), - // } - // } - // Err(error) => handle_error(error), - // } - // - // if let Err(error) = db.close() { - // handle_error(format!("db close error: {}", error.1)); - // } + match read_byron_genesis(byron_genesis) { + Ok(byron) => { + debug!("{:?}", byron); + match read_shelley_genesis(shelley_genesis) { + Ok(shelley) => { + debug!("{:?}", shelley); + match block_store.get_tip_slot_number() { + Ok(tip_slot_number) => { + debug!("tip_slot_number: {}", tip_slot_number); + let tip_time = + slot_to_naivedatetime(&byron, &shelley, tip_slot_number, shelley_trans_epoch.expect("infallible")) + .and_utc() + .timestamp(); + let system_time = Utc::now().timestamp(); + if system_time - tip_time < 120 { + let (epoch, _) = + get_first_slot_of_epoch(&byron, &shelley, tip_slot_number, shelley_trans_epoch.expect("infallible")); + debug!("epoch: {}", epoch); + for pool in pooltool_config.pools.iter() { + match block_store.get_current_slots(epoch, &pool.pool_id) { + Ok((slot_qty, hash)) => { + debug!("slot_qty: {}", slot_qty); + debug!("hash: {}", &hash); + match block_store.get_previous_slots(epoch - 1, &pool.pool_id) { + Ok(prev_slots) => { + let request = serde_json::ser::to_string(&PooltoolSendSlots { + api_key: pooltool_config.api_key.clone(), + pool_id: pool.pool_id.clone(), + epoch, + slot_qty, + hash, + override_time: override_time.clone(), + prev_slots, + }) + .unwrap(); + info!("Sending: {}", &request); + match reqwest::blocking::Client::builder().build() { + Ok(client) => { + let pooltool_result = client + .post("https://api.pooltool.io/v0/sendslots") + .body(request) + .send(); + + match pooltool_result { + Ok(response) => match response.text() { + Ok(text) => { + info!("Pooltool Response: {}", text); + } + Err(error) => { + error!("PoolTool error: {}", error); + } + }, + Err(error) => { + error!("PoolTool error: {}", error); + } + } + } + Err(err) => { + error!("Could not set up the reqwest client!: {}", err) + } + } + } + Err(error) => { + error!("Db Error: {}", error) + } + } + } + Err(error) => { + error!("Cannot find db record for {},{}: {}", epoch, &pool.pool_id, error) + } + } + } + } else { + handle_error("db not fully synced!") + } + } + Err(error) => handle_error(error), + } + } + Err(error) => handle_error(error), + } + } + Err(error) => handle_error(error), + } } fn print_status_synced() { diff --git a/src/nodeclient/sync/pooltool.rs b/src/nodeclient/sync/pooltool.rs index e171c7a..4f25813 100644 --- a/src/nodeclient/sync/pooltool.rs +++ b/src/nodeclient/sync/pooltool.rs @@ -246,11 +246,23 @@ impl BlockStore for PoolToolNotifier { Err(Error::Blockstore("Not implemented".to_string())) } - fn get_eta_v_before_slot(&mut self, slot_number: u64) -> Result, Error> { + fn get_eta_v_before_slot(&mut self, _slot_number: u64) -> Result, Error> { Err(Error::Blockstore("Not implemented".to_string())) } - fn get_prev_hash_before_slot(&mut self, slot_number: u64) -> Result, Error> { + fn get_prev_hash_before_slot(&mut self, _slot_number: u64) -> Result, Error> { + Err(Error::Blockstore("Not implemented".to_string())) + } + + fn save_slots(&mut self, _epoch: u64, _pool_id: &str, _slot_qty: u64, _slots: &str, _hash: &str) -> Result<(), Error> { + Err(Error::Blockstore("Not implemented".to_string())) + } + + fn get_current_slots(&mut self, _epoch: u64, _pool_id: &str) -> Result<(u64, String), Error> { + Err(Error::Blockstore("Not implemented".to_string())) + } + + fn get_previous_slots(&mut self, _epoch: u64, _pool_id: &str) -> Result, Error> { Err(Error::Blockstore("Not implemented".to_string())) } }