Skip to content

Commit

Permalink
feat: PR feedback vol. 1
Browse files Browse the repository at this point in the history
  • Loading branch information
tomg10 committed Nov 16, 2023
1 parent ca6cbd3 commit aa85c78
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 163 deletions.
2 changes: 1 addition & 1 deletion core/bin/snapshots_creator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ node.
Usage (local development):\
First run `zk env dev` \
then the creator can be run using:
`cargo run --bin snapshots_creator --release`
`zk run snapshots_creator`

Snapshot contents can be stored based on blob_store config either in local filesystem or GS.

Expand Down
49 changes: 28 additions & 21 deletions core/bin/snapshots_creator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use zksync_utils::time::seconds_since_epoch;
#[derive(Debug, Metrics)]
#[metrics(prefix = "snapshots_creator")]
struct SnapshotsCreatorMetrics {
pub storage_logs_chunks_count: Gauge<u64>,
storage_logs_chunks_count: Gauge<u64>,

pub snapshot_generation_duration: Gauge<u64>,
snapshot_generation_duration: Gauge<u64>,

pub snapshot_l1_batch: Gauge<u64>,
snapshot_l1_batch: Gauge<u64>,

pub snapshot_generation_timestamp: Gauge<u64>,
snapshot_generation_timestamp: Gauge<u64>,
}
#[vise::register]
pub(crate) static METRICS: vise::Global<SnapshotsCreatorMetrics> = vise::Global::new();
Expand Down Expand Up @@ -55,7 +55,7 @@ async fn process_storage_logs_single_chunk(
chunk_size: u64,
chunks_count: u64,
) -> anyhow::Result<String> {
let mut conn = pool.access_storage().await?;
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;
let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(l1_batch_number, chunk_id, chunk_size)
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn process_factory_deps(
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<String> {
tracing::info!("Processing factory dependencies");
let mut conn = pool.access_storage().await?;
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;
let factory_deps = conn
.snapshots_creator_dal()
.get_all_factory_deps(miniblock_number)
Expand All @@ -108,7 +108,7 @@ async fn process_factory_deps(
}

async fn run(blob_store: Box<dyn ObjectStore>, pool: ConnectionPool) -> anyhow::Result<()> {
let mut conn = pool.access_storage().await?;
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;
let start_time = seconds_since_epoch();

let l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await? - 1; // we subtract 1 so that after restore, EN node has at least one l1 batch to fetch
Expand All @@ -126,7 +126,7 @@ async fn run(blob_store: Box<dyn ObjectStore>, pool: ConnectionPool) -> anyhow::
return Ok(());
}

let miniblock_number = conn
let last_miniblock_number_in_batch = conn
.blocks_dal()
.get_miniblock_range_of_l1_batch(l1_batch_number)
.await?
Expand All @@ -145,7 +145,7 @@ async fn run(blob_store: Box<dyn ObjectStore>, pool: ConnectionPool) -> anyhow::

tracing::info!(
"Creating snapshot for storage logs up to miniblock {}, l1_batch {}",
miniblock_number,
last_miniblock_number_in_batch,
l1_batch_number.0
);
tracing::info!(
Expand All @@ -154,8 +154,13 @@ async fn run(blob_store: Box<dyn ObjectStore>, pool: ConnectionPool) -> anyhow::
chunk_size
);

let factory_deps_output_file =
process_factory_deps(&*blob_store, &pool, miniblock_number, l1_batch_number).await?;
let factory_deps_output_file = process_factory_deps(
&*blob_store,
&pool,
last_miniblock_number_in_batch,
l1_batch_number,
)
.await?;

let mut storage_logs_output_files = vec![];

Expand All @@ -176,7 +181,7 @@ async fn run(blob_store: Box<dyn ObjectStore>, pool: ConnectionPool) -> anyhow::
storage_logs_output_files.push(output_file.clone());
}

let mut conn = pool.access_storage().await?;
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;

conn.snapshots_dal()
.add_snapshot(
Expand All @@ -195,16 +200,18 @@ async fn run(blob_store: Box<dyn ObjectStore>, pool: ConnectionPool) -> anyhow::
.snapshot_generation_duration
.set(seconds_since_epoch() - start_time);

tracing::info!("Run metrics:");
tracing::info!(
"snapshot_generation_duration: {}s",
METRICS.snapshot_generation_duration.get()
);
tracing::info!("snapshot_l1_batch: {}", METRICS.snapshot_l1_batch.get());
tracing::info!(
"snapshot_generation_timestamp: {}",
METRICS.snapshot_generation_timestamp.get()
);
tracing::info!(
r#"Run metrics:
snapshot_generation_duration: {}sec
snapshot_l1_batch: {},
snapshot_generation_timestamp: {}
storage_logs_chunks_count: {}
"#,
METRICS.snapshot_generation_duration.get(),
METRICS.snapshot_l1_batch.get(),
METRICS.snapshot_generation_timestamp.get(),
"storage_logs_chunks_count: {}",
METRICS.storage_logs_chunks_count.get()
);

Expand Down
196 changes: 94 additions & 102 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -350,26 +350,6 @@
},
"query": "\n WITH events_select AS (\n SELECT\n address, topic1, topic2, topic3, topic4, value,\n miniblock_number, tx_hash, tx_index_in_block,\n event_index_in_block, event_index_in_tx\n FROM events\n WHERE miniblock_number > $1\n ORDER BY miniblock_number ASC, event_index_in_block ASC\n )\n SELECT miniblocks.hash as \"block_hash?\",\n address as \"address!\", topic1 as \"topic1!\", topic2 as \"topic2!\", topic3 as \"topic3!\", topic4 as \"topic4!\", value as \"value!\",\n miniblock_number as \"miniblock_number!\", miniblocks.l1_batch_number as \"l1_batch_number?\", tx_hash as \"tx_hash!\",\n tx_index_in_block as \"tx_index_in_block!\", event_index_in_block as \"event_index_in_block!\", event_index_in_tx as \"event_index_in_tx!\"\n FROM events_select\n INNER JOIN miniblocks ON events_select.miniblock_number = miniblocks.number\n ORDER BY miniblock_number ASC, event_index_in_block ASC\n "
},
"05dd7740e61d65a8204f3f39191fbe75bba682c2442e44f9aac28c367e6c47df": {
"describe": {
"columns": [
{
"name": "count",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "SELECT count(*) FROM initial_writes WHERE l1_batch_number <= $1"
},
"06d90ea65c1e06bd871f090a0fb0e8772ea5e923f1da5310bedd8dc90e0827f4": {
"describe": {
"columns": [
Expand Down Expand Up @@ -5297,38 +5277,6 @@
},
"query": "SELECT (SELECT l1_batch_number FROM miniblocks WHERE number = $1) as \"block_batch?\", (SELECT MAX(number) + 1 FROM l1_batches) as \"max_batch?\""
},
"5f1a47d24c50743838fe55d7e8933a5a100175ab272b03274a4fa069227ddebb": {
"describe": {
"columns": [
{
"name": "l1_batch_number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "created_at",
"ordinal": 1,
"type_info": "Timestamp"
},
{
"name": "factory_deps_filepath",
"ordinal": 2,
"type_info": "Text"
}
],
"nullable": [
false,
false,
false
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "SELECT l1_batch_number, created_at, factory_deps_filepath FROM snapshots WHERE l1_batch_number = $1"
},
"5f40849646bb7436e29cda8fb87fece2a4dcb580644f45ecb82388dece04f222": {
"describe": {
"columns": [
Expand Down Expand Up @@ -5849,36 +5797,6 @@
},
"query": "select MAX(number) from miniblocks where l1_batch_number = $1"
},
"68f8d7f1c9661f9c9fea0be9d40910a3a7e21ae368db46b5f9df3ac3973b7f4e": {
"describe": {
"columns": [
{
"name": "l1_batch_number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "created_at",
"ordinal": 1,
"type_info": "Timestamp"
},
{
"name": "factory_deps_filepath",
"ordinal": 2,
"type_info": "Text"
}
],
"nullable": [
false,
false,
false
],
"parameters": {
"Left": []
}
},
"query": "SELECT l1_batch_number, created_at, factory_deps_filepath FROM snapshots"
},
"6939e766e122458b2ac618d19b2759c4a7298ef72b81e8c3957e0a5cf35c9552": {
"describe": {
"columns": [
Expand Down Expand Up @@ -8873,6 +8791,26 @@
},
"query": "\n SELECT l1_batch_number, leaf_layer_subqueues_blob_url, aggregation_outputs_blob_url FROM node_aggregation_witness_jobs\n WHERE status='successful'\n AND leaf_layer_subqueues_blob_url is NOT NULL\n AND aggregation_outputs_blob_url is NOT NULL\n AND updated_at < NOW() - INTERVAL '30 days'\n LIMIT $1;\n "
},
"a190719309378ee1912ffedd8180c151aacf17c3ca3bfca8563fa404d587edc8": {
"describe": {
"columns": [
{
"name": "index",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
false
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "\n SELECT index\n FROM initial_writes\n WHERE l1_batch_number <= $1\n ORDER BY l1_batch_number DESC , index DESC \n LIMIT 1;\n "
},
"a19b7137403c5cdf1be5f5122ce4d297ed661fa8bdb3bc91f8a81fe9da47469e": {
"describe": {
"columns": [
Expand Down Expand Up @@ -9123,26 +9061,6 @@
},
"query": "SELECT MAX(operation_number) as \"max?\" FROM storage_logs WHERE miniblock_number = $1"
},
"a82efd1b7c8823c7ab274ae894e9e6bef3577ebba3e3eef8189a90109c588b65": {
"describe": {
"columns": [
{
"name": "storage_logs_filepaths",
"ordinal": 0,
"type_info": "TextArray"
}
],
"nullable": [
false
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "SELECT storage_logs_filepaths FROM snapshots WHERE l1_batch_number = $1"
},
"a8b32073a67ad77caab11e73a5cac5aa5b5382648ff95d6787a309eb3f64d434": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -9175,6 +9093,42 @@
},
"query": "SELECT l1_batch_number FROM initial_writes WHERE hashed_key = $1"
},
"a9c38212a31975c54d9782315eed5108eec85af9f4cca9d95971041293cda656": {
"describe": {
"columns": [
{
"name": "l1_batch_number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "created_at",
"ordinal": 1,
"type_info": "Timestamp"
},
{
"name": "factory_deps_filepath",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "storage_logs_filepaths",
"ordinal": 3,
"type_info": "TextArray"
}
],
"nullable": [
false,
false,
false,
false
],
"parameters": {
"Left": []
}
},
"query": "SELECT l1_batch_number, created_at, factory_deps_filepath, storage_logs_filepaths FROM snapshots"
},
"a9d96d6774af2637173d471f02995652cd4c131c05fdcb3d0e1644bcd1aa1809": {
"describe": {
"columns": [
Expand Down Expand Up @@ -10270,6 +10224,44 @@
},
"query": "SELECT number, l1_tx_count, l2_tx_count, timestamp, is_finished, fee_account_address, l2_to_l1_logs, l2_to_l1_messages, bloom, priority_ops_onchain_data, used_contract_hashes, base_fee_per_gas, l1_gas_price, l2_fair_gas_price, bootloader_code_hash, default_aa_code_hash, protocol_version, compressed_state_diffs, system_logs FROM l1_batches WHERE number = $1"
},
"c5e14ad3393b17ffdbf340b5e6a321d09460cb28ff3e326b9be1e4be2063a0d3": {
"describe": {
"columns": [
{
"name": "l1_batch_number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "created_at",
"ordinal": 1,
"type_info": "Timestamp"
},
{
"name": "factory_deps_filepath",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "storage_logs_filepaths",
"ordinal": 3,
"type_info": "TextArray"
}
],
"nullable": [
false,
false,
false,
false
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "SELECT l1_batch_number, created_at, factory_deps_filepath, storage_logs_filepaths FROM snapshots WHERE l1_batch_number = $1"
},
"c604ee1dd86ac154d67ddb339da5f65ca849887d6a1068623e874f9df00cfdd1": {
"describe": {
"columns": [],
Expand Down
11 changes: 8 additions & 3 deletions core/lib/dal/src/snapshots_creator_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ impl SnapshotsCreatorDal<'_, '_> {
l1_batch_number: L1BatchNumber,
) -> Result<u64, sqlx::Error> {
let count = sqlx::query!(
"SELECT count(*) FROM initial_writes WHERE l1_batch_number <= $1",
r#"
SELECT index
FROM initial_writes
WHERE l1_batch_number <= $1
ORDER BY l1_batch_number DESC , index DESC
LIMIT 1;
"#,
l1_batch_number.0 as i32
)
.fetch_one(self.storage.conn())
.await
.unwrap()
.count
.unwrap();
.index;
Ok(count as u64)
}

Expand Down
Loading

0 comments on commit aa85c78

Please sign in to comment.