Skip to content

Commit

Permalink
refactor(object_store): Wrap ObjectStore in Arc instead of Box (#820)
Browse files Browse the repository at this point in the history
## What ❔

Changes `Box<dyn ObjectStore>` to `Arc<dyn ObjectStore>`.

## Why ❔

`ObjectStore` is meant to be a point of customization in ZK Stack, but
with the current approach the only way to get it is via
`ObjectStoreFactory`, and it yields objects packed in `Box`, e.g. each
instance is unique.
In order to make it customizable, `ObjectStore` should be universally
shareable (to allow it to be created once and then copied), and this is
the first step toward that.
Later `ObjectStoreFactory` type will be removed completely to grant full
control regarding the initial type to the initialization code.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `cargo spellcheck
--cfg=./spellcheck/era.cfg --code 1`.
  • Loading branch information
popzxc authored Jan 5, 2024
1 parent 2ceb911 commit 1d1b682
Show file tree
Hide file tree
Showing 23 changed files with 69 additions and 58 deletions.
4 changes: 3 additions & 1 deletion core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! [`SnapshotCreator`] and tightly related types.
use std::sync::Arc;

use anyhow::Context as _;
use tokio::sync::Semaphore;
use zksync_config::SnapshotsCreatorConfig;
Expand Down Expand Up @@ -61,7 +63,7 @@ impl SnapshotProgress {
/// Creator of a single storage snapshot.
#[derive(Debug)]
pub(crate) struct SnapshotCreator {
pub blob_store: Box<dyn ObjectStore>,
pub blob_store: Arc<dyn ObjectStore>,
pub master_pool: ConnectionPool,
pub replica_pool: ConnectionPool,
#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions core/bin/snapshots_creator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use std::{
collections::{HashMap, HashSet},
fmt,
sync::atomic::{AtomicUsize, Ordering},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -58,7 +61,7 @@ impl HandleEvent for TestEventListener {
}

impl SnapshotCreator {
fn for_tests(blob_store: Box<dyn ObjectStore>, pool: ConnectionPool) -> Self {
fn for_tests(blob_store: Arc<dyn ObjectStore>, pool: ConnectionPool) -> Self {
Self {
blob_store,
master_pool: pool.clone(),
Expand Down
14 changes: 7 additions & 7 deletions core/lib/object_store/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl error::Error for ObjectStoreError {
///
/// [`StoredObject`]: crate::StoredObject
#[async_trait]
pub trait ObjectStore: fmt::Debug + Send + Sync {
pub trait ObjectStore: 'static + fmt::Debug + Send + Sync {
/// Fetches the value for the given key from the given bucket if it exists.
///
/// # Errors
Expand Down Expand Up @@ -178,14 +178,14 @@ impl ObjectStoreFactory {
}

/// Creates an [`ObjectStore`].
pub async fn create_store(&self) -> Box<dyn ObjectStore> {
pub async fn create_store(&self) -> Arc<dyn ObjectStore> {
match &self.origin {
ObjectStoreOrigin::Config(config) => Self::create_from_config(config).await,
ObjectStoreOrigin::Mock(store) => Box::new(Arc::clone(store)),
ObjectStoreOrigin::Mock(store) => Arc::new(Arc::clone(store)),
}
}

async fn create_from_config(config: &ObjectStoreConfig) -> Box<dyn ObjectStore> {
async fn create_from_config(config: &ObjectStoreConfig) -> Arc<dyn ObjectStore> {
let gcs_credential_file_path = match config.mode {
ObjectStoreMode::GCSWithCredentialFile => Some(config.gcs_credential_file_path.clone()),
_ => None,
Expand All @@ -201,7 +201,7 @@ impl ObjectStoreFactory {
config.max_retries,
)
.await;
Box::new(store)
Arc::new(store)
}
ObjectStoreMode::GCSWithCredentialFile => {
tracing::trace!("Initialized GoogleCloudStorage Object store with credential file");
Expand All @@ -211,12 +211,12 @@ impl ObjectStoreFactory {
config.max_retries,
)
.await;
Box::new(store)
Arc::new(store)
}
ObjectStoreMode::FileBacked => {
tracing::trace!("Initialized FileBacked Object store");
let store = FileBackedObjectStore::new(config.file_backed_base_path.clone()).await;
Box::new(store)
Arc::new(store)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl BasicWitnessInputProducer {
) -> anyhow::Result<Self> {
Ok(BasicWitnessInputProducer {
connection_pool,
object_store: store_factory.create_store().await.into(),
object_store: store_factory.create_store().await,
l2_chain_id,
})
}
Expand Down
6 changes: 4 additions & 2 deletions core/lib/zksync_core/src/eth_sender/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use zksync_config::configs::eth_sender::{ProofLoadingMode, ProofSendingMode, SenderConfig};
use zksync_contracts::BaseSystemContractsHashes;
use zksync_dal::StorageProcessor;
Expand All @@ -24,11 +26,11 @@ pub struct Aggregator {
proof_criteria: Vec<Box<dyn L1BatchPublishCriterion>>,
execute_criteria: Vec<Box<dyn L1BatchPublishCriterion>>,
config: SenderConfig,
blob_store: Box<dyn ObjectStore>,
blob_store: Arc<dyn ObjectStore>,
}

impl Aggregator {
pub fn new(config: SenderConfig, blob_store: Box<dyn ObjectStore>) -> Self {
pub fn new(config: SenderConfig, blob_store: Arc<dyn ObjectStore>) -> Self {
Self {
commit_criteria: vec![
Box::from(NumberCriterion {
Expand Down
4 changes: 2 additions & 2 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ async fn add_state_keeper_to_task_futures<E: L1GasPriceProvider + Send + Sync +
db_config: &DBConfig,
mempool_config: &MempoolConfig,
gas_adjuster: Arc<E>,
object_store: Box<dyn ObjectStore>,
object_store: Arc<dyn ObjectStore>,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let fair_l2_gas_price = state_keeper_config.fair_l2_gas_price;
Expand Down Expand Up @@ -793,7 +793,7 @@ async fn run_tree(
merkle_tree_config: &MerkleTreeConfig,
api_config: Option<&MerkleTreeApiConfig>,
operation_manager: &OperationsManagerConfig,
object_store: Option<Box<dyn ObjectStore>>,
object_store: Option<Arc<dyn ObjectStore>>,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let started_at = Instant::now();
Expand Down
5 changes: 3 additions & 2 deletions core/lib/zksync_core/src/metadata_calculator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{
future::{self, Future},
sync::Arc,
time::Duration,
};

Expand Down Expand Up @@ -81,7 +82,7 @@ impl MetadataCalculatorConfig {
pub struct MetadataCalculator {
tree: GenericAsyncTree,
tree_reader: watch::Sender<Option<AsyncTreeReader>>,
object_store: Option<Box<dyn ObjectStore>>,
object_store: Option<Arc<dyn ObjectStore>>,
delayer: Delayer,
health_updater: HealthUpdater,
max_l1_batches_per_iter: usize,
Expand All @@ -91,7 +92,7 @@ impl MetadataCalculator {
/// Creates a calculator with the specified `config`.
pub async fn new(
config: MetadataCalculatorConfig,
object_store: Option<Box<dyn ObjectStore>>,
object_store: Option<Arc<dyn ObjectStore>>,
) -> Self {
assert!(
config.max_l1_batches_per_iter > 0,
Expand Down
6 changes: 3 additions & 3 deletions core/lib/zksync_core/src/metadata_calculator/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Tests for the metadata calculator component life cycle.
use std::{future::Future, ops, panic, path::Path, time::Duration};
use std::{future::Future, ops, panic, path::Path, sync::Arc, time::Duration};

use assert_matches::assert_matches;
use itertools::Itertools;
Expand Down Expand Up @@ -360,7 +360,7 @@ async fn postgres_backup_recovery_with_excluded_metadata() {
pub(crate) async fn setup_calculator(
db_path: &Path,
pool: &ConnectionPool,
) -> (MetadataCalculator, Box<dyn ObjectStore>) {
) -> (MetadataCalculator, Arc<dyn ObjectStore>) {
let store_factory = ObjectStoreFactory::mock();
let store = store_factory.create_store().await;
let (merkle_tree_config, operation_manager) = create_config(db_path, MerkleTreeMode::Full);
Expand Down Expand Up @@ -395,7 +395,7 @@ async fn setup_calculator_with_options(
merkle_tree_config: &MerkleTreeConfig,
operation_config: &OperationsManagerConfig,
pool: &ConnectionPool,
object_store: Option<Box<dyn ObjectStore>>,
object_store: Option<Arc<dyn ObjectStore>>,
) -> MetadataCalculator {
let calculator_config =
MetadataCalculatorConfig::for_main_node(merkle_tree_config, operation_config);
Expand Down
6 changes: 3 additions & 3 deletions core/lib/zksync_core/src/metadata_calculator/updater.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Tree updater trait and its implementations.
use std::{ops, time::Instant};
use std::{ops, sync::Arc, time::Instant};

use anyhow::Context as _;
use futures::{future, FutureExt};
Expand All @@ -24,14 +24,14 @@ use crate::utils::wait_for_l1_batch;
pub(super) struct TreeUpdater {
tree: AsyncTree,
max_l1_batches_per_iter: usize,
object_store: Option<Box<dyn ObjectStore>>,
object_store: Option<Arc<dyn ObjectStore>>,
}

impl TreeUpdater {
pub fn new(
tree: AsyncTree,
max_l1_batches_per_iter: usize,
object_store: Option<Box<dyn ObjectStore>>,
object_store: Option<Arc<dyn ObjectStore>>,
) -> Self {
Self {
tree,
Expand Down
4 changes: 2 additions & 2 deletions core/lib/zksync_core/src/proof_data_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};

use anyhow::Context as _;
use axum::{extract::Path, routing::post, Json, Router};
Expand Down Expand Up @@ -34,7 +34,7 @@ fn fri_l1_verifier_config(contracts_config: &ContractsConfig) -> L1VerifierConfi
pub(crate) async fn run_server(
config: ProofDataHandlerConfig,
contracts_config: ContractsConfig,
blob_store: Box<dyn ObjectStore>,
blob_store: Arc<dyn ObjectStore>,
pool: ConnectionPool,
mut stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ impl IntoResponse for RequestProcessorError {

impl RequestProcessor {
pub(crate) fn new(
blob_store: Box<dyn ObjectStore>,
blob_store: Arc<dyn ObjectStore>,
pool: ConnectionPool,
config: ProofDataHandlerConfig,
l1_verifier_config: Option<L1VerifierConfig>,
) -> Self {
Self {
blob_store: Arc::from(blob_store),
blob_store,
pool,
config,
l1_verifier_config,
Expand Down
4 changes: 2 additions & 2 deletions core/lib/zksync_core/src/state_keeper/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
pub(crate) struct MempoolIO {
mempool: MempoolGuard,
pool: ConnectionPool,
object_store: Box<dyn ObjectStore>,
object_store: Arc<dyn ObjectStore>,
timeout_sealer: TimeoutSealer,
filter: L2TxFilter,
current_miniblock_number: MiniblockNumber,
Expand Down Expand Up @@ -404,7 +404,7 @@ impl MempoolIO {
#[allow(clippy::too_many_arguments)]
pub(in crate::state_keeper) async fn new(
mempool: MempoolGuard,
object_store: Box<dyn ObjectStore>,
object_store: Arc<dyn ObjectStore>,
miniblock_sealer_handle: MiniblockSealerHandle,
l1_gas_price_provider: Arc<dyn L1GasPriceProvider>,
pool: ConnectionPool,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) async fn create_state_keeper(
mempool: MempoolGuard,
l1_gas_price_provider: Arc<dyn L1GasPriceProvider>,
miniblock_sealer_handle: MiniblockSealerHandle,
object_store: Box<dyn ObjectStore>,
object_store: Arc<dyn ObjectStore>,
stop_receiver: watch::Receiver<bool>,
) -> ZkSyncStateKeeper {
let batch_executor_base = MainBatchExecutorBuilder::new(
Expand Down
6 changes: 3 additions & 3 deletions prover/proof_fri_compressor/src/compressor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Instant;
use std::{sync::Arc, time::Instant};

use anyhow::Context as _;
use async_trait::async_trait;
Expand Down Expand Up @@ -33,7 +33,7 @@ use zksync_vk_setup_data_server_fri::{get_recursive_layer_vk_for_circuit_type, g
use crate::metrics::METRICS;

pub struct ProofCompressor {
blob_store: Box<dyn ObjectStore>,
blob_store: Arc<dyn ObjectStore>,
pool: ConnectionPool,
compression_mode: u8,
verify_wrapper_proof: bool,
Expand All @@ -42,7 +42,7 @@ pub struct ProofCompressor {

impl ProofCompressor {
pub fn new(
blob_store: Box<dyn ObjectStore>,
blob_store: Arc<dyn ObjectStore>,
pool: ConnectionPool,
compression_mode: u8,
verify_wrapper_proof: bool,
Expand Down
8 changes: 4 additions & 4 deletions prover/prover_fri/src/gpu_prover_job_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ pub mod gpu_prover {

#[allow(dead_code)]
pub struct Prover {
blob_store: Box<dyn ObjectStore>,
public_blob_store: Option<Box<dyn ObjectStore>>,
blob_store: Arc<dyn ObjectStore>,
public_blob_store: Option<Arc<dyn ObjectStore>>,
config: Arc<FriProverConfig>,
prover_connection_pool: ConnectionPool,
setup_load_mode: SetupLoadMode,
Expand All @@ -66,8 +66,8 @@ pub mod gpu_prover {
impl Prover {
#[allow(dead_code)]
pub fn new(
blob_store: Box<dyn ObjectStore>,
public_blob_store: Option<Box<dyn ObjectStore>>,
blob_store: Arc<dyn ObjectStore>,
public_blob_store: Option<Arc<dyn ObjectStore>>,
config: FriProverConfig,
prover_connection_pool: ConnectionPool,
setup_load_mode: SetupLoadMode,
Expand Down
6 changes: 3 additions & 3 deletions prover/prover_fri/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![feature(generic_const_exprs)]
use std::future::Future;
use std::{future::Future, sync::Arc};

use anyhow::Context as _;
use local_ip_address::local_ip;
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn get_prover_tasks(
prover_config: FriProverConfig,
stop_receiver: Receiver<bool>,
store_factory: ObjectStoreFactory,
public_blob_store: Option<Box<dyn ObjectStore>>,
public_blob_store: Option<Arc<dyn ObjectStore>>,
pool: ConnectionPool,
circuit_ids_for_round_to_be_proven: Vec<CircuitIdRoundTuple>,
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
Expand Down Expand Up @@ -204,7 +204,7 @@ async fn get_prover_tasks(
prover_config: FriProverConfig,
stop_receiver: Receiver<bool>,
store_factory: ObjectStoreFactory,
public_blob_store: Option<Box<dyn ObjectStore>>,
public_blob_store: Option<Arc<dyn ObjectStore>>,
pool: ConnectionPool,
circuit_ids_for_round_to_be_proven: Vec<CircuitIdRoundTuple>,
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
Expand Down
8 changes: 4 additions & 4 deletions prover/prover_fri/src/prover_job_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ pub enum SetupLoadMode {
}

pub struct Prover {
blob_store: Box<dyn ObjectStore>,
public_blob_store: Option<Box<dyn ObjectStore>>,
blob_store: Arc<dyn ObjectStore>,
public_blob_store: Option<Arc<dyn ObjectStore>>,
config: Arc<FriProverConfig>,
prover_connection_pool: ConnectionPool,
setup_load_mode: SetupLoadMode,
Expand All @@ -57,8 +57,8 @@ pub struct Prover {
impl Prover {
#[allow(dead_code)]
pub fn new(
blob_store: Box<dyn ObjectStore>,
public_blob_store: Option<Box<dyn ObjectStore>>,
blob_store: Arc<dyn ObjectStore>,
public_blob_store: Option<Arc<dyn ObjectStore>>,
config: FriProverConfig,
prover_connection_pool: ConnectionPool,
setup_load_mode: SetupLoadMode,
Expand Down
4 changes: 2 additions & 2 deletions prover/prover_fri_gateway/src/api_data_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use reqwest::Client;
Expand All @@ -16,7 +16,7 @@ pub(crate) const PROOF_GENERATION_DATA_PATH: &str = "/proof_generation_data";
pub(crate) const SUBMIT_PROOF_PATH: &str = "/submit_proof";

pub(crate) struct PeriodicApiStruct {
pub(crate) blob_store: Box<dyn ObjectStore>,
pub(crate) blob_store: Arc<dyn ObjectStore>,
pub(crate) pool: ConnectionPool,
pub(crate) api_url: String,
pub(crate) poll_duration: Duration,
Expand Down
Loading

0 comments on commit 1d1b682

Please sign in to comment.