Skip to content

Commit

Permalink
feat: correct operation_number in initial_writes; faster insert logs …
Browse files Browse the repository at this point in the history
…using copy_in_raw
  • Loading branch information
tomg10 committed Nov 8, 2023
1 parent a8a31e8 commit e583660
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 13 deletions.
41 changes: 41 additions & 0 deletions core/lib/dal/src/storage_logs_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use sqlx::Row;
use std::{collections::HashMap, time::Instant};

use crate::{instrument::InstrumentExt, StorageProcessor};
use zksync_types::snapshots::SnapshotStorageLog;
use zksync_types::{
get_code_key, AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, StorageLog,
FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256,
Expand Down Expand Up @@ -68,6 +69,46 @@ impl StorageLogsDal<'_, '_> {
copy.finish().await.unwrap();
}

pub async fn insert_storage_logs_from_snapshot(
&mut self,
miniblock_number: MiniblockNumber,
snapshot_storage_logs: &[SnapshotStorageLog],
) {
let mut copy = self
.storage
.conn()
.copy_in_raw(
"COPY storage_logs(
hashed_key, address, key, value, operation_number, tx_hash, miniblock_number,
created_at, updated_at
)
FROM STDIN WITH (DELIMITER '|')",
)
.await
.unwrap();

let mut buffer = String::new();
let now = Utc::now().naive_utc().to_string();
for log in snapshot_storage_logs.iter() {
write_str!(
&mut buffer,
r"\\x{hashed_key:x}|\\x{address:x}|\\x{key:x}|\\x{value:x}|",
hashed_key = log.key.hashed_key(),
address = log.key.address(),
key = log.key.key(),
value = log.value
);
writeln_str!(
&mut buffer,
r"{}|\\x{:x}|{miniblock_number}|{now}|{now}",
log.enumeration_index,
H256::zero()
);
}
copy.send(buffer.as_bytes()).await.unwrap();
copy.finish().await.unwrap();
}

pub async fn append_storage_logs(
&mut self,
block_number: MiniblockNumber,
Expand Down
34 changes: 34 additions & 0 deletions core/lib/dal/src/storage_logs_dedup_dal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::StorageProcessor;
use sqlx::types::chrono::Utc;
use std::collections::HashSet;
use zksync_types::snapshots::SnapshotStorageLog;
use zksync_types::{AccountTreeId, Address, L1BatchNumber, LogQuery, StorageKey, H256};
use zksync_utils::u256_to_h256;

Expand Down Expand Up @@ -42,6 +43,39 @@ impl StorageLogsDedupDal<'_, '_> {

/// Insert initial writes and assigns indices to them.
/// Assumes indices are already assigned for all saved initial_writes, so must be called only after the migration.
pub async fn insert_initial_writes_from_snapshot(
&mut self,
snapshot_storage_logs: &[SnapshotStorageLog],
) {
let mut copy = self
.storage
.conn()
.copy_in_raw(
"COPY initial_writes (hashed_key, index, l1_batch_number, created_at, updated_at) \
FROM STDIN WITH (DELIMITER '|')",
)
.await
.unwrap();

let mut bytes: Vec<u8> = Vec::new();
let now = Utc::now().naive_utc().to_string();
for log in snapshot_storage_logs.iter() {
let hashed_key_str = format!("\\\\x{}", hex::encode(log.key.hashed_key().0));
let row = format!(
"{}|{}|{}|{}|{}\n",
hashed_key_str,
log.enumeration_index,
log.l1_batch_number_of_initial_write,
now,
now,
);
bytes.extend_from_slice(row.as_bytes());
}
copy.send(bytes).await.unwrap();
copy.finish().await.unwrap();
}

pub async fn insert_initial_writes(
&mut self,
l1_batch_number: L1BatchNumber,
Expand Down
15 changes: 2 additions & 13 deletions core/lib/zksync_core/src/sync_layer/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,31 +280,20 @@ impl<'a, 'b, 'd> SnapshotApplier<'a, 'b, 'd> {
storage_logs: &[SnapshotStorageLog],
storage: &mut StorageProcessor<'_>,
) {
let l1_batch_number = self.snapshot.l1_batch_number;
tracing::info!("Loading {} storage logs into postgres", storage_logs.len());
let storage_logs_keys: Vec<StorageKey> = storage_logs.iter().map(|log| log.key).collect();
storage
.storage_logs_dedup_dal()
.insert_initial_writes(l1_batch_number, &storage_logs_keys)
.insert_initial_writes_from_snapshot(&storage_logs)
.await;
}
async fn sync_storage_logs_chunk(
&mut self,
storage_logs: &[SnapshotStorageLog],
storage: &mut StorageProcessor<'_>,
) {
let miniblock_number = self.snapshot.miniblock_number;
let transformed_logs = storage_logs
.iter()
.map(|log| StorageLog {
kind: StorageLogKind::Write,
key: log.key,
value: log.value,
})
.collect();
storage
.storage_logs_dal()
.append_storage_logs(miniblock_number, &[(H256::zero(), transformed_logs)])
.insert_storage_logs_from_snapshot(self.snapshot.miniblock_number, storage_logs)
.await;
}

Expand Down

0 comments on commit e583660

Please sign in to comment.