Skip to content

Commit

Permalink
fix: PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tomg10 committed Dec 5, 2023
1 parent 7564113 commit 502e3a0
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 59 deletions.
4 changes: 2 additions & 2 deletions core/bin/snapshots_creator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ Snapshot contents can be stored based on blob_store config either in local files
## Snapshots format

Each snapshot consists of three types of objects (see
[snapshots.rs](https://github.com/matter-labs/zksync-era/core/lib/types/src/snapshots.rs)) : header, storage logs chunks
and factory deps:
[snapshots.rs](https://github.com/matter-labs/zksync-era/blob/main/core/lib/types/src/snapshots.rs)) : header, storage
logs chunks and factory deps:

- Snapshot Header (currently returned by snapshots namespace of JSON-RPC API)
- Snapshot Storage logs chunks (most likely to be stored in gzipped protobuf files, but this part is still WIP) :
Expand Down
2 changes: 1 addition & 1 deletion core/bin/snapshots_creator/src/chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ pub fn get_chunk_hashed_keys_range(chunk_id: u64, chunks_count: u64) -> ([u8; 2]

let start_bytes = (chunk_start as u16).to_be_bytes();
let end_bytes = (chunk_end as u16).to_be_bytes();
return (start_bytes, end_bytes);
(start_bytes, end_bytes)
}
32 changes: 14 additions & 18 deletions core/bin/snapshots_creator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::pin::Pin;
use std::time::Duration;
use tokio::sync::watch;
use tokio::sync::watch::Receiver;
use vise::Unit;
use vise::{Buckets, Gauge, Histogram, Metrics};
use zksync_config::configs::PrometheusConfig;
use zksync_config::{PostgresConfig, SnapshotsCreatorConfig};
Expand All @@ -25,7 +26,6 @@ use zksync_types::snapshots::{
use zksync_types::zkevm_test_harness::zk_evm::zkevm_opcode_defs::decoding::AllowedPcOrImm;
use zksync_types::{L1BatchNumber, MiniblockNumber};
use zksync_utils::ceil_div;
use zksync_utils::time::seconds_since_epoch;

#[derive(Debug, Metrics)]
#[metrics(prefix = "snapshots_creator")]
Expand All @@ -34,15 +34,16 @@ struct SnapshotsCreatorMetrics {

storage_logs_chunks_left_to_process: Gauge<u64>,

snapshot_generation_duration: Gauge<u64>,
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
snapshot_generation_duration: Histogram<Duration>,

snapshot_l1_batch: Gauge<u64>,

#[metrics(buckets = Buckets::LATENCIES)]
storage_logs_processing_durations: Histogram<Duration>,
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
storage_logs_processing_duration: Histogram<Duration>,

#[metrics(buckets = Buckets::LATENCIES)]
factory_deps_processing_durations: Histogram<Duration>,
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
factory_deps_processing_duration: Histogram<Duration>,
}
#[vise::register]
pub(crate) static METRICS: vise::Global<SnapshotsCreatorMetrics> = vise::Global::new();
Expand Down Expand Up @@ -72,7 +73,7 @@ async fn process_storage_logs_single_chunk(
chunks_count: u64,
) -> anyhow::Result<String> {
let (min_hashed_key, max_hashed_key) = get_chunk_hashed_keys_range(chunk_id, chunks_count);
let latency = METRICS.storage_logs_processing_durations.start();
let latency = METRICS.storage_logs_processing_duration.start();
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;
let logs = conn
.snapshots_creator_dal()
Expand Down Expand Up @@ -107,7 +108,7 @@ async fn process_factory_deps(
miniblock_number: MiniblockNumber,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<String> {
let latency = METRICS.factory_deps_processing_durations.start();
let latency = METRICS.factory_deps_processing_duration.start();
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;
let factory_deps = conn
.snapshots_creator_dal()
Expand All @@ -134,12 +135,13 @@ async fn run(
replica_pool: ConnectionPool,
master_pool: ConnectionPool,
) -> anyhow::Result<()> {
let latency = METRICS.snapshot_generation_duration.start();

let config = SnapshotsCreatorConfig::from_env().context("SnapshotsCreatorConfig::from_env")?;

let mut conn = replica_pool
.access_storage_tagged("snapshots_creator")
.await?;
let start_time = seconds_since_epoch();

// we subtract 1 so that after restore, EN node has at least one l1 batch to fetch
let l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await? - 1;
Expand Down Expand Up @@ -200,7 +202,7 @@ async fn run(
let mut tasks =
FuturesUnordered::<Pin<Box<dyn Future<Output = anyhow::Result<String>>>>>::new();
let mut last_chunk_id = 0;
while last_chunk_id < chunks_count || tasks.len() != 0 {
while last_chunk_id < chunks_count || !tasks.is_empty() {
while (tasks.len() as u32) < config.concurrent_queries_count && last_chunk_id < chunks_count
{
tasks.push(Box::pin(process_storage_logs_single_chunk(
Expand Down Expand Up @@ -242,15 +244,9 @@ async fn run(
.await?;

METRICS.snapshot_l1_batch.set(l1_batch_number.0.as_u64());
METRICS
.snapshot_generation_duration
.set(seconds_since_epoch() - start_time);

tracing::info!("Run metrics:");
tracing::info!(
"snapshot_generation_duration: {}s",
METRICS.snapshot_generation_duration.get()
);
let elapsed_sec = latency.observe().as_secs();
tracing::info!("snapshot_generation_duration: {elapsed_sec}s");
tracing::info!("snapshot_l1_batch: {}", METRICS.snapshot_l1_batch.get());
tracing::info!(
"storage_logs_chunks_count: {}",
Expand Down
70 changes: 32 additions & 38 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -276,19 +276,19 @@
}
],
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false,
true,
false,
false,
false,
false
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
Expand Down Expand Up @@ -5232,6 +5232,26 @@
},
"query": "INSERT INTO events_queue (l1_batch_number, serialized_events_queue) VALUES ($1, $2)"
},
"61cc5a1564918a34b4235290c421f04c40ef935f72f2c72744a5b741439a966a": {
"describe": {
"columns": [
{
"name": "bytecode",
"ordinal": 0,
"type_info": "Bytea"
}
],
"nullable": [
false
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "SELECT bytecode FROM factory_deps WHERE miniblock_number <= $1"
},
"6317155050a5dae24ea202cfd54d1e58cc7aeb0bfd4d95aa351f85cff04d3bff": {
"describe": {
"columns": [
Expand Down Expand Up @@ -8780,32 +8800,6 @@
},
"query": "INSERT INTO prover_fri_protocol_versions (id, recursion_scheduler_level_vk_hash, recursion_node_level_vk_hash, recursion_leaf_level_vk_hash, recursion_circuits_set_vks_hash, created_at) VALUES ($1, $2, $3, $4, $5, now()) ON CONFLICT(id) DO NOTHING"
},
"a2ac8d74aec70c50cd1919c9d532965bfc38d89bf6379bf649318e55071f1b41": {
"describe": {
"columns": [
{
"name": "bytecode",
"ordinal": 0,
"type_info": "Bytea"
},
{
"name": "bytecode_hash",
"ordinal": 1,
"type_info": "Bytea"
}
],
"nullable": [
false,
false
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "SELECT bytecode, bytecode_hash FROM factory_deps WHERE miniblock_number <= $1"
},
"a39f760d2cd879a78112e57d8611d7099802b03b7cc4933cafb4c47e133ad543": {
"describe": {
"columns": [
Expand Down

0 comments on commit 502e3a0

Please sign in to comment.