Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change(state): Add an init function for a standalone ReadStateService #8595

Merged
merged 5 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions zebra-state/src/arbitrary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,6 @@ where
}
}

impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
fn from(prepared: SemanticallyVerifiedBlock) -> Self {
let SemanticallyVerifiedBlock {
block,
hash,
height,
new_outputs: _,
transaction_hashes,
} = prepared;

Self {
hash,
height,
time: block.header.time,
transactions: block.transactions.clone(),
transaction_hashes,
previous_block_hash: block.header.previous_block_hash,
}
}
}

impl SemanticallyVerifiedBlock {
/// Returns a [`ContextuallyVerifiedBlock`] created from this block,
/// with fake zero-valued spent UTXOs.
Expand Down
9 changes: 5 additions & 4 deletions zebra-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ pub use request::{
};
pub use response::{KnownBlock, MinedTx, ReadResponse, Response};
pub use service::{
chain_tip::{ChainTipChange, LatestChainTip, TipAction},
check, init, spawn_init,
chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip, TipAction},
check, init, init_read_only,
non_finalized_state::NonFinalizedState,
spawn_init, spawn_init_read_only,
watch_receiver::WatchReceiver,
OutputIndex, OutputLocation, TransactionIndex, TransactionLocation,
};
Expand Down Expand Up @@ -76,7 +78,6 @@ pub use response::GetBlockTemplateChainInfo;
#[cfg(any(test, feature = "proptest-impl"))]
pub use service::{
arbitrary::{populated_state, CHAIN_TIP_UPDATE_WAIT_LIMIT},
chain_tip::{ChainTipBlock, ChainTipSender},
finalized_state::{RawBytes, KV, MAX_ON_DISK_HEIGHT},
init_test, init_test_services,
};
Expand All @@ -96,4 +97,4 @@ pub(crate) use config::hidden::{
write_database_format_version_to_disk, write_state_database_format_version_to_disk,
};

pub(crate) use request::ContextuallyVerifiedBlock;
pub use request::ContextuallyVerifiedBlock;
90 changes: 55 additions & 35 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ use std::{
time::{Duration, Instant},
};

#[cfg(feature = "elasticsearch")]
use elasticsearch::{
auth::Credentials::Basic,
cert::CertificateValidation,
http::transport::{SingleNodeConnectionPool, TransportBuilder},
http::Url,
Elasticsearch,
};

use futures::future::FutureExt;
use tokio::sync::{oneshot, watch};
use tower::{util::BoxService, Service, ServiceExt};
Expand Down Expand Up @@ -319,29 +310,12 @@ impl StateService {
checkpoint_verify_concurrency_limit: usize,
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
let timer = CodeTimer::start();

#[cfg(feature = "elasticsearch")]
let finalized_state = {
let conn_pool = SingleNodeConnectionPool::new(
Url::parse(config.elasticsearch_url.as_str())
.expect("configured elasticsearch url is invalid"),
);
let transport = TransportBuilder::new(conn_pool)
.cert_validation(CertificateValidation::None)
.auth(Basic(
config.clone().elasticsearch_username,
config.clone().elasticsearch_password,
))
.build()
.expect("elasticsearch transport builder should not fail");
let elastic_db = Some(Elasticsearch::new(transport));

FinalizedState::new(&config, network, elastic_db)
};

#[cfg(not(feature = "elasticsearch"))]
let finalized_state = { FinalizedState::new(&config, network) };

let finalized_state = FinalizedState::new(
&config,
network,
#[cfg(feature = "elasticsearch")]
true,
);
timer.finish(module_path!(), line!(), "opening finalized state database");

let timer = CodeTimer::start();
Expand Down Expand Up @@ -387,7 +361,7 @@ impl StateService {

let read_service = ReadStateService::new(
&finalized_state,
block_write_task,
Some(block_write_task),
non_finalized_state_receiver,
);

Expand Down Expand Up @@ -828,14 +802,14 @@ impl ReadStateService {
/// and a watch channel for updating the shared recent non-finalized chain.
pub(crate) fn new(
finalized_state: &FinalizedState,
block_write_task: Arc<std::thread::JoinHandle<()>>,
block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
non_finalized_state_receiver: watch::Receiver<NonFinalizedState>,
) -> Self {
let read_service = Self {
network: finalized_state.network(),
db: finalized_state.db.clone(),
non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
block_write_task: Some(block_write_task),
block_write_task,
};

tracing::debug!("created new read-only state service");
Expand Down Expand Up @@ -1945,6 +1919,52 @@ pub fn init(
)
}

/// Initialize a read state service from the provided [`Config`].
/// Returns a read-only state service,
///
/// Each `network` has its own separate on-disk database.
///
/// To share access to the state, clone the returned [`ReadStateService`].
pub fn init_read_only(
config: Config,
network: &Network,
) -> (
ReadStateService,
ZebraDb,
tokio::sync::watch::Sender<NonFinalizedState>,
) {
let finalized_state = FinalizedState::new_with_debug(
&config,
network,
true,
#[cfg(feature = "elasticsearch")]
false,
true,
);
let (non_finalized_state_sender, non_finalized_state_receiver) =
tokio::sync::watch::channel(NonFinalizedState::new(network));

(
ReadStateService::new(&finalized_state, None, non_finalized_state_receiver),
finalized_state.db.clone(),
non_finalized_state_sender,
)
}

/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
pub fn spawn_init_read_only(
config: Config,
network: &Network,
) -> tokio::task::JoinHandle<(
ReadStateService,
ZebraDb,
tokio::sync::watch::Sender<NonFinalizedState>,
)> {
let network = network.clone();
tokio::task::spawn_blocking(move || init_read_only(config, &network))
}

/// Calls [`init`] with the provided [`Config`] and [`Network`] from a blocking task.
/// Returns a [`tokio::task::JoinHandle`] with a boxed state service,
/// a read state service, and receivers for state chain tip updates.
Expand Down
16 changes: 11 additions & 5 deletions zebra-state/src/service/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ impl From<ContextuallyVerifiedBlock> for ChainTipBlock {
}
}

impl From<CheckpointVerifiedBlock> for ChainTipBlock {
fn from(finalized: CheckpointVerifiedBlock) -> Self {
let CheckpointVerifiedBlock(SemanticallyVerifiedBlock {
impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
fn from(prepared: SemanticallyVerifiedBlock) -> Self {
let SemanticallyVerifiedBlock {
block,
hash,
height,
new_outputs: _,
transaction_hashes,
..
}) = finalized;
} = prepared;

Self {
hash,
Expand All @@ -128,6 +128,12 @@ impl From<CheckpointVerifiedBlock> for ChainTipBlock {
}
}

impl From<CheckpointVerifiedBlock> for ChainTipBlock {
fn from(CheckpointVerifiedBlock(prepared): CheckpointVerifiedBlock) -> Self {
prepared.into()
}
}

/// A sender for changes to the non-finalized and finalized chain tips.
#[derive(Debug)]
pub struct ChainTipSender {
Expand Down
34 changes: 31 additions & 3 deletions zebra-state/src/service/finalized_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ impl FinalizedState {
pub fn new(
config: &Config,
network: &Network,
#[cfg(feature = "elasticsearch")] elastic_db: Option<elasticsearch::Elasticsearch>,
#[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
) -> Self {
Self::new_with_debug(
config,
network,
false,
#[cfg(feature = "elasticsearch")]
elastic_db,
enable_elastic_db,
false,
)
}
Expand All @@ -162,9 +162,37 @@ impl FinalizedState {
config: &Config,
network: &Network,
debug_skip_format_upgrades: bool,
#[cfg(feature = "elasticsearch")] elastic_db: Option<elasticsearch::Elasticsearch>,
#[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
read_only: bool,
) -> Self {
#[cfg(feature = "elasticsearch")]
let elastic_db = if enable_elastic_db {
use elasticsearch::{
auth::Credentials::Basic,
cert::CertificateValidation,
http::transport::{SingleNodeConnectionPool, TransportBuilder},
http::Url,
Elasticsearch,
};

let conn_pool = SingleNodeConnectionPool::new(
Url::parse(config.elasticsearch_url.as_str())
.expect("configured elasticsearch url is invalid"),
);
let transport = TransportBuilder::new(conn_pool)
.cert_validation(CertificateValidation::None)
.auth(Basic(
config.clone().elasticsearch_username,
config.clone().elasticsearch_password,
))
.build()
.expect("elasticsearch transport builder should not fail");

Some(Elasticsearch::new(transport))
} else {
None
};

let db = ZebraDb::new(
config,
STATE_DATABASE_KIND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn test_raw_rocksdb_column_families_with_network(network: Network) {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

// Snapshot the column family names
Expand Down
4 changes: 2 additions & 2 deletions zebra-state/src/service/finalized_state/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn blocks_with_v5_transactions() -> Result<()> {
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)),
|((chain, count, network, _history_tree) in PreparedChain::default())| {
let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] None);
let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] false);
let mut height = Height(0);
// use `count` to minimize test failures, so they are easier to diagnose
for block in chain.iter().take(count) {
Expand Down Expand Up @@ -65,7 +65,7 @@ fn all_upgrades_and_wrong_commitments_with_fake_activation_heights() -> Result<(
.unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)),
|((chain, _count, network, _history_tree) in PreparedChain::default().with_valid_commitments().no_shrink())| {

let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] None);
let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] false);
let mut height = Height(0);
let heartwood_height = NetworkUpgrade::Heartwood.activation_height(&network).unwrap();
let heartwood_height_plus1 = (heartwood_height + 1).unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ fn test_block_and_transaction_data_with_network(network: Network) {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

// Assert that empty databases are the same, regardless of the network.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use zebra_chain::{
amount::NonNegative,
block::{self, arbitrary::allow_all_transparent_coinbase_spends, Block, Height},
fmt::DisplayToDebug,

Check warning on line 10 in zebra-state/src/service/non_finalized_state/tests/prop.rs

View workflow job for this annotation

GitHub Actions / Test stable on windows-latest

unused import: `fmt::DisplayToDebug`
history_tree::{HistoryTree, NonEmptyHistoryTree},
parameters::NetworkUpgrade::*,
parameters::*,
Expand All @@ -20,7 +20,7 @@
request::ContextuallyVerifiedBlock,
service::{
arbitrary::PreparedChain,
finalized_state::FinalizedState,

Check warning on line 23 in zebra-state/src/service/non_finalized_state/tests/prop.rs

View workflow job for this annotation

GitHub Actions / Test stable on windows-latest

unused imports: `Config`, `NonFinalizedState`, `finalized_state::FinalizedState`
non_finalized_state::{Chain, NonFinalizedState},
},
Config,
Expand Down Expand Up @@ -479,7 +479,7 @@
}
))| {
let mut state = NonFinalizedState::new(&network);
let finalized_state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] None);
let finalized_state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] false);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
finalized_state.set_finalized_value_pool(fake_value_pool);
Expand Down
16 changes: 8 additions & 8 deletions zebra-state/src/service/non_finalized_state/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ fn best_chain_wins_for_network(network: Network) -> Result<()> {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

state.commit_new_chain(block2.prepare(), &finalized_state)?;
Expand Down Expand Up @@ -194,7 +194,7 @@ fn finalize_pops_from_best_chain_for_network(network: Network) -> Result<()> {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -245,7 +245,7 @@ fn commit_block_extending_best_chain_doesnt_drop_worst_chains_for_network(
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -289,7 +289,7 @@ fn shorter_chain_can_be_best_chain_for_network(network: Network) -> Result<()> {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -334,7 +334,7 @@ fn longer_chain_with_more_work_wins_for_network(network: Network) -> Result<()>
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -378,7 +378,7 @@ fn equal_length_goes_to_more_work_for_network(network: Network) -> Result<()> {
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

let fake_value_pool = ValueBalance::<NonNegative>::fake_populated_pool();
Expand Down Expand Up @@ -426,7 +426,7 @@ fn history_tree_is_updated_for_network_upgrade(
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

state
Expand Down Expand Up @@ -525,7 +525,7 @@ fn commitment_is_validated_for_network_upgrade(network: Network, network_upgrade
&Config::ephemeral(),
&network,
#[cfg(feature = "elasticsearch")]
None,
false,
);

state
Expand Down
Loading
Loading