Skip to content

Commit

Permalink
renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
AnastasiiaVashchuk committed Nov 6, 2023
1 parent d6ed84f commit e61cf74
Show file tree
Hide file tree
Showing 23 changed files with 106 additions and 93 deletions.
5 changes: 0 additions & 5 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,6 @@ async fn init_tasks(
.build()
.await
.context("failed to build a tree_pool")?;
// todo: PLA-335
let prover_tree_pool = ServerConnectionPool::singleton(DbVariant::Prover)
.build()
.await
.context("failed to build a prover_tree_pool")?;
let tree_handle =
task::spawn(metadata_calculator.run(tree_pool, prover_tree_pool, tree_stop_receiver));

Expand Down
7 changes: 4 additions & 3 deletions core/lib/zksync_core/src/eth_sender/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use zksync_config::configs::eth_sender::{ProofLoadingMode, ProofSendingMode, SenderConfig};
use zksync_contracts::BaseSystemContractsHashes;
use zksync_object_store::ObjectStore;
use zksync_prover_dal::ProverStorageProcessor;
use zksync_prover_utils::gcs_proof_fetcher::load_wrapped_fri_proofs_for_range;
use zksync_server_dal::ServerStorageProcessor;
use zksync_types::{
Expand Down Expand Up @@ -91,7 +92,7 @@ impl Aggregator {
pub async fn get_next_ready_operation(
&mut self,
storage: &mut ServerStorageProcessor<'_>,
prover_storage: &mut ServerStorageProcessor<'_>,
prover_storage: &mut ProverStorageProcessor<'_>,
base_system_contracts_hashes: BaseSystemContractsHashes,
protocol_version_id: ProtocolVersionId,
l1_verifier_config: L1VerifierConfig,
Expand Down Expand Up @@ -210,7 +211,7 @@ impl Aggregator {

async fn load_real_proof_operation(
storage: &mut ServerStorageProcessor<'_>,
prover_storage: &mut ServerStorageProcessor<'_>,
prover_storage: &mut ProverStorageProcessor<'_>,
l1_verifier_config: L1VerifierConfig,
proof_loading_mode: &ProofLoadingMode,
blob_store: &dyn ObjectStore,
Expand Down Expand Up @@ -317,7 +318,7 @@ impl Aggregator {
async fn get_proof_operation(
&mut self,
storage: &mut ServerStorageProcessor<'_>,
prover_storage: &mut ServerStorageProcessor<'_>,
prover_storage: &mut ProverStorageProcessor<'_>,
limit: usize,
last_sealed_l1_batch: L1BatchNumber,
l1_verifier_config: L1VerifierConfig,
Expand Down
12 changes: 8 additions & 4 deletions core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::sync::watch;
use zksync_config::configs::eth_sender::SenderConfig;
use zksync_contracts::BaseSystemContractsHashes;
use zksync_eth_client::BoundEthInterface;
use zksync_prover_dal::{ProverConnectionPool, ProverStorageProcessor};
use zksync_server_dal::{ServerConnectionPool, ServerStorageProcessor};
use zksync_types::{
aggregated_operations::AggregatedOperation,
Expand Down Expand Up @@ -71,13 +72,16 @@ impl EthTxAggregator {

pub async fn run<E: BoundEthInterface>(
mut self,
pool: ServerConnectionPool,
prover_pool: ServerConnectionPool,
server_pool: ServerConnectionPool,
prover_pool: ProverConnectionPool,
eth_client: E,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
loop {
let mut storage = pool.access_storage_tagged("eth_sender").await.unwrap();
let mut storage = server_pool
.access_storage_tagged("eth_sender")
.await
.unwrap();
let mut prover_storage = prover_pool
.access_storage_tagged("eth_sender")
.await
Expand Down Expand Up @@ -345,7 +349,7 @@ impl EthTxAggregator {
async fn loop_iteration<E: BoundEthInterface>(
&mut self,
storage: &mut ServerStorageProcessor<'_>,
prover_storage: &mut ServerStorageProcessor<'_>,
prover_storage: &mut ProverStorageProcessor<'_>,
eth_client: &E,
) -> Result<(), ETHSenderError> {
let MulticallData {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::time::Duration;

use async_trait::async_trait;
use zksync_prover_dal::ProverConnectionPool;
use zksync_prover_utils::periodic_job::PeriodicJob;
use zksync_server_dal::ServerConnectionPool;

#[derive(Debug)]
pub struct FriProofCompressorJobRetryManager {
pool: ServerConnectionPool,
pool: ProverConnectionPool,
max_attempts: u32,
processing_timeout: Duration,
retry_interval_ms: u64,
Expand All @@ -17,7 +17,7 @@ impl FriProofCompressorJobRetryManager {
max_attempts: u32,
processing_timeout: Duration,
retry_interval_ms: u64,
pool: ServerConnectionPool,
pool: ProverConnectionPool,
) -> Self {
Self {
max_attempts,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use zksync_server_dal::ServerConnectionPool;
use zksync_prover_dal::ProverConnectionPool;
use zksync_types::proofs::JobCountStatistics;

use zksync_prover_utils::periodic_job::PeriodicJob;
Expand All @@ -9,18 +9,18 @@ const PROOF_COMPRESSOR_SERVICE_NAME: &str = "proof_compressor";
#[derive(Debug)]
pub struct FriProofCompressorStatsReporter {
reporting_interval_ms: u64,
pool: ServerConnectionPool,
pool: ProverConnectionPool,
}

impl FriProofCompressorStatsReporter {
pub fn new(reporting_interval_ms: u64, pool: ServerConnectionPool) -> Self {
pub fn new(reporting_interval_ms: u64, pool: ProverConnectionPool) -> Self {
Self {
reporting_interval_ms,
pool,
}
}

async fn get_job_statistics(pool: &ServerConnectionPool) -> JobCountStatistics {
async fn get_job_statistics(pool: &ProverConnectionPool) -> JobCountStatistics {
pool.access_storage()
.await
.unwrap()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::time::Duration;

use async_trait::async_trait;
use zksync_prover_dal::ProverConnectionPool;
use zksync_prover_utils::periodic_job::PeriodicJob;
use zksync_server_dal::ServerConnectionPool;

#[derive(Debug)]
pub struct FriProverJobRetryManager {
pool: ServerConnectionPool,
pool: ProverConnectionPool,
max_attempts: u32,
processing_timeout: Duration,
retry_interval_ms: u64,
Expand All @@ -17,7 +17,7 @@ impl FriProverJobRetryManager {
max_attempts: u32,
processing_timeout: Duration,
retry_interval_ms: u64,
pool: ServerConnectionPool,
pool: ProverConnectionPool,
) -> Self {
Self {
max_attempts,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use async_trait::async_trait;
use zksync_prover_dal::ProverConnectionPool;
use zksync_prover_utils::periodic_job::PeriodicJob;
use zksync_server_dal::ServerConnectionPool;

#[derive(Debug)]
pub struct FriProverStatsReporter {
reporting_interval_ms: u64,
prover_connection_pool: ServerConnectionPool,
prover_connection_pool: ProverConnectionPool,
}

impl FriProverStatsReporter {
pub fn new(reporting_interval_ms: u64, prover_connection_pool: ServerConnectionPool) -> Self {
pub fn new(reporting_interval_ms: u64, prover_connection_pool: ProverConnectionPool) -> Self {
Self {
reporting_interval_ms,
prover_connection_pool,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use async_trait::async_trait;
use zksync_server_dal::ServerConnectionPool;
use zksync_prover_dal::ProverConnectionPool;

use zksync_prover_utils::periodic_job::PeriodicJob;

#[derive(Debug)]
pub struct SchedulerCircuitQueuer {
queuing_interval_ms: u64,
pool: ServerConnectionPool,
pool: ProverConnectionPool,
}

impl SchedulerCircuitQueuer {
pub fn new(queuing_interval_ms: u64, pool: ServerConnectionPool) -> Self {
pub fn new(queuing_interval_ms: u64, pool: ProverConnectionPool) -> Self {
Self {
queuing_interval_ms,
pool,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::time::Duration;

use async_trait::async_trait;
use zksync_prover_dal::ProverConnectionPool;
use zksync_prover_utils::periodic_job::PeriodicJob;
use zksync_server_dal::ServerConnectionPool;

#[derive(Debug)]
pub struct FriWitnessGeneratorJobRetryManager {
pool: ServerConnectionPool,
pool: ProverConnectionPool,
max_attempts: u32,
processing_timeout: Duration,
retry_interval_ms: u64,
Expand All @@ -17,7 +17,7 @@ impl FriWitnessGeneratorJobRetryManager {
max_attempts: u32,
processing_timeout: Duration,
retry_interval_ms: u64,
pool: ServerConnectionPool,
pool: ProverConnectionPool,
) -> Self {
Self {
max_attempts,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use async_trait::async_trait;
use zksync_server_dal::ServerConnectionPool;
use zksync_prover_dal::ProverConnectionPool;
use zksync_types::proofs::{AggregationRound, JobCountStatistics};

use zksync_prover_utils::periodic_job::PeriodicJob;
Expand All @@ -11,11 +11,11 @@ const FRI_WITNESS_GENERATOR_SERVICE_NAME: &str = "fri_witness_generator";
#[derive(Debug)]
pub struct FriWitnessGeneratorStatsReporter {
reporting_interval_ms: u64,
pool: ServerConnectionPool,
pool: ProverConnectionPool,
}

impl FriWitnessGeneratorStatsReporter {
pub fn new(pool: ServerConnectionPool, reporting_interval_ms: u64) -> Self {
pub fn new(pool: ProverConnectionPool, reporting_interval_ms: u64) -> Self {
Self {
reporting_interval_ms,
pool,
Expand Down
22 changes: 13 additions & 9 deletions core/lib/zksync_core/src/house_keeper/gcs_blob_cleaner.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use async_trait::async_trait;
use zksync_object_store::{Bucket, ObjectStore, ObjectStoreError, ObjectStoreFactory};
use zksync_server_dal::ServerConnectionPool;
use zksync_prover_dal::ProverConnectionPool;

use zksync_prover_utils::periodic_job::PeriodicJob;
use zksync_server_dal::ServerConnectionPool;

trait AsBlobUrls {
fn as_blob_urls(&self) -> (&str, Option<&str>);
Expand All @@ -24,7 +25,8 @@ impl AsBlobUrls for (String, String) {
pub struct GcsBlobCleaner {
object_store: Box<dyn ObjectStore>,
cleaning_interval_ms: u64,
pool: ServerConnectionPool,
prover_pool: ProverConnectionPool,
server_pool: ServerConnectionPool,
}

const BATCH_CLEANUP_SIZE: u8 = 5;
Expand All @@ -44,13 +46,15 @@ fn handle_remove_result(result: Result<(), ObjectStoreError>) {
impl GcsBlobCleaner {
pub async fn new(
store_factory: &ObjectStoreFactory,
pool: ServerConnectionPool,
prover_pool: ProverConnectionPool,
server_pool: ServerConnectionPool,
cleaning_interval_ms: u64,
) -> Self {
Self {
object_store: store_factory.create_store().await,
cleaning_interval_ms,
pool,
prover_pool,
server_pool,
}
}

Expand All @@ -63,7 +67,7 @@ impl GcsBlobCleaner {
}

async fn cleanup_prover_jobs_blobs(&self) {
let mut conn = self.pool.access_storage().await.unwrap();
let mut conn = self.prover_pool.access_storage().await.unwrap();
let blob_urls = conn
.prover_dal()
.get_circuit_input_blob_urls_to_be_cleaned(BATCH_CLEANUP_SIZE)
Expand Down Expand Up @@ -92,7 +96,7 @@ impl GcsBlobCleaner {
}

async fn cleanup_witness_inputs_blobs(&self) {
let mut conn = self.pool.access_storage().await.unwrap();
let mut conn = self.server_pool.access_storage().await.unwrap();
let blob_urls = conn
.blocks_dal()
.get_merkle_tree_paths_blob_urls_to_be_cleaned(BATCH_CLEANUP_SIZE)
Expand All @@ -108,7 +112,7 @@ impl GcsBlobCleaner {
}

async fn cleanup_leaf_aggregation_witness_jobs_blobs(&self) {
let mut conn = self.pool.access_storage().await.unwrap();
let mut conn = self.prover_pool.access_storage().await.unwrap();

let blob_urls = conn
.witness_generator_dal()
Expand All @@ -123,7 +127,7 @@ impl GcsBlobCleaner {
}

async fn cleanup_node_aggregation_witness_jobs_blobs(&self) {
let mut conn = self.pool.access_storage().await.unwrap();
let mut conn = self.prover_pool.access_storage().await.unwrap();
let blob_urls = conn
.witness_generator_dal()
.get_leaf_layer_subqueues_and_aggregation_outputs_blob_urls_to_be_cleaned(
Expand All @@ -139,7 +143,7 @@ impl GcsBlobCleaner {
}

async fn cleanup_scheduler_witness_jobs_blobs(&self) {
let mut conn = self.pool.access_storage().await.unwrap();
let mut conn = self.prover_pool.access_storage().await.unwrap();
let blob_urls = conn
.witness_generator_dal()
.get_scheduler_witness_and_node_aggregations_blob_urls_to_be_cleaned(BATCH_CLEANUP_SIZE)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use async_trait::async_trait;
use zksync_server_dal::ServerConnectionPool;
use zksync_prover_dal::ProverConnectionPool;

use zksync_prover_utils::periodic_job::PeriodicJob;

#[derive(Debug)]
pub struct GpuProverQueueMonitor {
synthesizer_per_gpu: u16,
reporting_interval_ms: u64,
prover_connection_pool: ServerConnectionPool,
prover_connection_pool: ProverConnectionPool,
}

impl GpuProverQueueMonitor {
pub fn new(
synthesizer_per_gpu: u16,
reporting_interval_ms: u64,
prover_connection_pool: ServerConnectionPool,
prover_connection_pool: ProverConnectionPool,
) -> Self {
Self {
synthesizer_per_gpu,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use async_trait::async_trait;
use zksync_server_dal::ServerConnectionPool;
use zksync_prover_dal::ProverConnectionPool;

use zksync_prover_utils::periodic_job::PeriodicJob;

Expand All @@ -10,15 +10,15 @@ pub struct ProverJobRetryManager {
max_attempts: u32,
processing_timeout: Duration,
retry_interval_ms: u64,
prover_connection_pool: ServerConnectionPool,
prover_connection_pool: ProverConnectionPool,
}

impl ProverJobRetryManager {
pub fn new(
max_attempts: u32,
processing_timeout: Duration,
retry_interval_ms: u64,
prover_connection_pool: ServerConnectionPool,
prover_connection_pool: ProverConnectionPool,
) -> Self {
Self {
max_attempts,
Expand Down
Loading

0 comments on commit e61cf74

Please sign in to comment.