From 6390ddee2189afeea99d94e9046b4d1ebf3d5cce Mon Sep 17 00:00:00 2001 From: DvirYo-starkware <115620476+DvirYo-starkware@users.noreply.github.com> Date: Thu, 16 May 2024 12:32:09 +0300 Subject: [PATCH] Dvir/write tx to file (#1997) * perf(storage)!: write thin transaction outputs to a file * perf(storage)!: write transactions to a file --- crates/papyrus_storage/src/body/events.rs | 24 ++++- crates/papyrus_storage/src/body/mod.rs | 95 ++++++++++++++++--- crates/papyrus_storage/src/lib.rs | 65 ++++++++++++- .../src/serialization/serializers.rs | 2 + 4 files changed, 163 insertions(+), 23 deletions(-) diff --git a/crates/papyrus_storage/src/body/events.rs b/crates/papyrus_storage/src/body/events.rs index 16321e6e70..a838ccd049 100644 --- a/crates/papyrus_storage/src/body/events.rs +++ b/crates/papyrus_storage/src/body/events.rs @@ -65,7 +65,8 @@ use crate::body::{EventsTable, EventsTableKey, TransactionIndex}; use crate::db::serialization::{NoVersionValueWrapper, VersionZeroWrapper}; use crate::db::table_types::{DbCursor, DbCursorTrait, SimpleTable, Table}; use crate::db::{DbTransaction, RO}; -use crate::{StorageResult, StorageTxn}; +use crate::mmap_file::LocationInFile; +use crate::{FileHandlers, StorageResult, StorageTxn}; /// An identifier of an event. #[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Ord)] @@ -159,6 +160,7 @@ impl EventIterByContractAddress<'_> { /// and finally, by the event index in the transaction output. pub struct EventIterByEventIndex<'txn, 'env> { txn: &'txn DbTransaction<'env, RO>, + file_handlers: &'txn FileHandlers, tx_current: Option, tx_cursor: TransactionOutputsTableCursor<'txn>, events_table: EventsTable<'env>, @@ -206,7 +208,12 @@ impl EventIterByEventIndex<'_, '_> { // There are no more events in the current transaction, so we go over the rest of the // transactions until we find an event. - self.tx_current = self.tx_cursor.next()?; + let Some((tx_index, location)) = self.tx_cursor.next()? else { + self.tx_current = None; + return Ok(()); + }; + self.tx_current = + Some((tx_index, self.file_handlers.get_transaction_output_unchecked(location)?)); self.event_index_in_tx_current = EventIndexInTransactionOutput(0); } @@ -248,12 +255,19 @@ impl<'txn, 'env> StorageTxn<'env, RO> { ) -> StorageResult> { let transaction_outputs_table = self.open_table(&self.tables.transaction_outputs)?; let mut tx_cursor = transaction_outputs_table.cursor(&self.txn)?; - let tx_current = tx_cursor.lower_bound(&event_index.0)?; let events_table = self.open_table(&self.tables.events)?; + let first_txn_location = tx_cursor.lower_bound(&event_index.0)?; + let first_relevant_transaction = match first_txn_location { + None => None, + Some((tx_index, location)) => { + Some((tx_index, self.file_handlers.get_transaction_output_unchecked(location)?)) + } + }; let mut it = EventIterByEventIndex { txn: &self.txn, - tx_current, + file_handlers: &self.file_handlers, + tx_current: first_relevant_transaction, tx_cursor, events_table, event_index_in_tx_current: event_index.1, @@ -476,4 +490,4 @@ type EventsTableCursor<'txn> = type TransactionOutputsKeyValue = (TransactionIndex, ThinTransactionOutput); /// A cursor of the transaction outputs table. type TransactionOutputsTableCursor<'txn> = - DbCursor<'txn, RO, TransactionIndex, VersionZeroWrapper, SimpleTable>; + DbCursor<'txn, RO, TransactionIndex, VersionZeroWrapper, SimpleTable>; diff --git a/crates/papyrus_storage/src/body/mod.rs b/crates/papyrus_storage/src/body/mod.rs index a2c16481cd..bfe68a0f3d 100644 --- a/crates/papyrus_storage/src/body/mod.rs +++ b/crates/papyrus_storage/src/body/mod.rs @@ -63,12 +63,21 @@ use crate::body::events::{EventIndex, ThinTransactionOutput}; use crate::db::serialization::{NoVersionValueWrapper, ValueSerde, VersionZeroWrapper}; use crate::db::table_types::{DbCursorTrait, SimpleTable, Table}; use crate::db::{DbTransaction, TableHandle, TransactionKind, RW}; -use crate::{MarkerKind, MarkersTable, StorageError, StorageResult, StorageScope, StorageTxn}; +use crate::mmap_file::LocationInFile; +use crate::{ + FileHandlers, + MarkerKind, + MarkersTable, + StorageError, + StorageResult, + StorageScope, + StorageTxn, +}; type TransactionsTable<'env> = - TableHandle<'env, TransactionIndex, VersionZeroWrapper, SimpleTable>; + TableHandle<'env, TransactionIndex, VersionZeroWrapper, SimpleTable>; type TransactionOutputsTable<'env> = - TableHandle<'env, TransactionIndex, VersionZeroWrapper, SimpleTable>; + TableHandle<'env, TransactionIndex, VersionZeroWrapper, SimpleTable>; type TransactionHashToIdxTable<'env> = TableHandle<'env, TransactionHash, NoVersionValueWrapper, SimpleTable>; type TransactionIdxToHashTable<'env> = @@ -175,8 +184,11 @@ impl<'env, Mode: TransactionKind> BodyStorageReader for StorageTxn<'env, Mode> { transaction_index: TransactionIndex, ) -> StorageResult> { let transactions_table = self.open_table(&self.tables.transactions)?; - let transaction = transactions_table.get(&self.txn, &transaction_index)?; - Ok(transaction) + let Some(tx_location) = transactions_table.get(&self.txn, &transaction_index)? else { + return Ok(None); + }; + let transaction = self.file_handlers.get_transaction_unchecked(tx_location)?; + Ok(Some(transaction)) } fn get_transaction_output( @@ -184,8 +196,14 @@ impl<'env, Mode: TransactionKind> BodyStorageReader for StorageTxn<'env, Mode> { transaction_index: TransactionIndex, ) -> StorageResult> { let transaction_outputs_table = self.open_table(&self.tables.transaction_outputs)?; - let transaction_output = transaction_outputs_table.get(&self.txn, &transaction_index)?; - Ok(transaction_output) + let Some(tx_output_location) = + transaction_outputs_table.get(&self.txn, &transaction_index)? + else { + return Ok(None); + }; + let transaction_output = + self.file_handlers.get_transaction_output_unchecked(tx_output_location)?; + Ok(Some(transaction_output)) } fn get_transaction_events( @@ -237,7 +255,7 @@ impl<'env, Mode: TransactionKind> BodyStorageReader for StorageTxn<'env, Mode> { block_number: BlockNumber, ) -> StorageResult>> { let transactions_table = self.open_table(&self.tables.transactions)?; - self.get_transactions_in_block(block_number, transactions_table) + self.get_transaction_objects_in_block(block_number, transactions_table) } fn get_block_transaction_hashes( @@ -254,7 +272,7 @@ impl<'env, Mode: TransactionKind> BodyStorageReader for StorageTxn<'env, Mode> { block_number: BlockNumber, ) -> StorageResult>> { let transaction_outputs_table = self.open_table(&self.tables.transaction_outputs)?; - self.get_transactions_in_block(block_number, transaction_outputs_table) + self.get_transaction_outputs_in_block(block_number, transaction_outputs_table) } fn get_block_transactions_count( @@ -312,6 +330,50 @@ impl<'env, Mode: TransactionKind> StorageTxn<'env, Mode> { } Ok(Some(res)) } + + // TODO(dvir): remove this function when we have a general table interface also for values + // written to a file. + // Returns the transaction outputs in the given block. + fn get_transaction_outputs_in_block( + &self, + block_number: BlockNumber, + transaction_output_offsets_table: TransactionOutputsTable<'env>, + ) -> StorageResult>> { + let Some(locations) = + self.get_transactions_in_block(block_number, transaction_output_offsets_table)? + else { + return Ok(None); + }; + + let mut res = Vec::new(); + for location in locations { + res.push(self.file_handlers.get_transaction_output_unchecked(location)?); + } + + Ok(Some(res)) + } + + // TODO(dvir): remove this function when we have a general table interface also for values + // written to a file. + // Returns the transactions in the given block. + fn get_transaction_objects_in_block( + &self, + block_number: BlockNumber, + transaction_offsets_table: TransactionsTable<'env>, + ) -> StorageResult>> { + let Some(locations) = + self.get_transactions_in_block(block_number, transaction_offsets_table)? + else { + return Ok(None); + }; + + let mut res = Vec::new(); + for location in locations { + res.push(self.file_handlers.get_transaction_unchecked(location)?); + } + + Ok(Some(res)) + } } impl<'env> BodyStorageWriter for StorageTxn<'env, RW> { @@ -332,6 +394,7 @@ impl<'env> BodyStorageWriter for StorageTxn<'env, RW> { write_transactions( &block_body, &self.txn, + &self.file_handlers, &transactions_table, &transaction_hash_to_idx_table, &transaction_idx_to_hash_table, @@ -340,6 +403,7 @@ impl<'env> BodyStorageWriter for StorageTxn<'env, RW> { write_transaction_outputs( block_body, &self.txn, + &self.file_handlers, &transaction_outputs_table, &events_table, block_number, @@ -428,6 +492,7 @@ impl<'env> BodyStorageWriter for StorageTxn<'env, RW> { fn write_transactions<'env>( block_body: &BlockBody, txn: &DbTransaction<'env, RW>, + file_handlers: &FileHandlers, transactions_table: &'env TransactionsTable<'env>, transaction_hash_to_idx_table: &'env TransactionHashToIdxTable<'env>, transaction_idx_to_hash_table: &'env TransactionIdxToHashTable<'env>, @@ -445,7 +510,8 @@ fn write_transactions<'env>( tx_hash, transaction_index, )?; - transactions_table.insert(txn, &transaction_index, tx)?; + let location = file_handlers.append_transaction(tx); + transactions_table.insert(txn, &transaction_index, &location)?; } Ok(()) } @@ -453,6 +519,7 @@ fn write_transactions<'env>( fn write_transaction_outputs<'env>( block_body: BlockBody, txn: &DbTransaction<'env, RW>, + file_handlers: &FileHandlers, transaction_outputs_table: &'env TransactionOutputsTable<'env>, events_table: &'env EventsTable<'env>, block_number: BlockNumber, @@ -461,11 +528,9 @@ fn write_transaction_outputs<'env>( let transaction_index = TransactionIndex(block_number, TransactionOffsetInBlock(index)); write_events(&tx_output, txn, events_table, transaction_index)?; - transaction_outputs_table.insert( - txn, - &transaction_index, - &ThinTransactionOutput::from(tx_output), - )?; + let location = + file_handlers.append_transaction_output(&ThinTransactionOutput::from(tx_output)); + transaction_outputs_table.insert(txn, &transaction_index, &location)?; } Ok(()) } diff --git a/crates/papyrus_storage/src/lib.rs b/crates/papyrus_storage/src/lib.rs index 5f08f30fd4..df12f04c9c 100644 --- a/crates/papyrus_storage/src/lib.rs +++ b/crates/papyrus_storage/src/lib.rs @@ -536,8 +536,10 @@ struct_field_names! { state_diffs: TableIdentifier, SimpleTable>, transaction_hash_to_idx: TableIdentifier, SimpleTable>, transaction_idx_to_hash: TableIdentifier, SimpleTable>, - transaction_outputs: TableIdentifier, SimpleTable>, - transactions: TableIdentifier, SimpleTable>, + // TODO(dvir): write transaction also to a file and combine all the mapping from tx_index to the same table. + // Afterward add option to iterate over tx/output also with tx_hash. + transaction_outputs: TableIdentifier, SimpleTable>, + transactions: TableIdentifier, SimpleTable>, // Version tables starknet_version: TableIdentifier, SimpleTable>, @@ -670,6 +672,8 @@ struct FileHandlers { contract_class: FileHandler, Mode>, casm: FileHandler, Mode>, deprecated_contract_class: FileHandler, Mode>, + thin_transaction_output: FileHandler, Mode>, + transaction: FileHandler, Mode>, } impl FileHandlers { @@ -704,6 +708,19 @@ impl FileHandlers { self.casm.flush(); self.deprecated_contract_class.flush(); } + + // Appends a thin transaction output to the corresponding file and returns its location. + fn append_transaction_output( + &self, + transaction_output: &ThinTransactionOutput, + ) -> LocationInFile { + self.clone().thin_transaction_output.append(transaction_output) + } + + // Appends a transaction to the corresponding file and returns its location. + fn append_transaction(&self, transaction: &Transaction) -> LocationInFile { + self.clone().transaction.append(transaction) + } } impl FileHandlers { @@ -714,6 +731,8 @@ impl FileHandlers { ("contract_class".to_string(), self.contract_class.stats()), ("casm".to_string(), self.casm.stats()), ("deprecated_contract_class".to_string(), self.deprecated_contract_class.stats()), + ("thin_transaction_output".to_string(), self.thin_transaction_output.stats()), + ("transaction".to_string(), self.transaction.stats()), ]) } @@ -754,6 +773,24 @@ impl FileHandlers { msg: format!("DeprecatedContractClass at location {:?} not found.", location), }) } + + // Returns the thin transaction output at the given location or an error in case it doesn't + // exist. + fn get_transaction_output_unchecked( + &self, + location: LocationInFile, + ) -> StorageResult { + self.thin_transaction_output.get(location)?.ok_or(StorageError::DBInconsistency { + msg: format!("ThinTransactionOutput at location {:?} not found.", location), + }) + } + + // Returns the transaction at the given location or an error in case it doesn't exist. + fn get_transaction_unchecked(&self, location: LocationInFile) -> StorageResult { + self.transaction.get(location)?.ok_or(StorageError::DBInconsistency { + msg: format!("Transaction at location {:?} not found.", location), + }) + } } fn open_storage_files( @@ -765,6 +802,7 @@ fn open_storage_files( let db_transaction = db_reader.begin_ro_txn()?; let table = db_transaction.open_table(file_offsets_table)?; + // TODO(dvir): consider using a loop here to avoid code duplication. let thin_state_diff_offset = table.get(&db_transaction, &OffsetKind::ThinStateDiff)?.unwrap_or_default(); let (thin_state_diff_writer, thin_state_diff_reader) = open_file( @@ -788,23 +826,40 @@ fn open_storage_files( let deprecated_contract_class_offset = table.get(&db_transaction, &OffsetKind::DeprecatedContractClass)?.unwrap_or_default(); let (deprecated_contract_class_writer, deprecated_contract_class_reader) = open_file( - mmap_file_config, + mmap_file_config.clone(), db_config.path().join("deprecated_contract_class.dat"), deprecated_contract_class_offset, )?; + let transaction_output_offset = + table.get(&db_transaction, &OffsetKind::ThinTransactionOutput)?.unwrap_or_default(); + let (transaction_output_writer, transaction_output_reader) = open_file( + mmap_file_config.clone(), + db_config.path().join("transaction_output.dat"), + transaction_output_offset, + )?; + + let transaction_offset = + table.get(&db_transaction, &OffsetKind::Transaction)?.unwrap_or_default(); + let (transaction_writer, transaction_reader) = + open_file(mmap_file_config, db_config.path().join("transaction.dat"), transaction_offset)?; + Ok(( FileHandlers { thin_state_diff: thin_state_diff_writer, contract_class: contract_class_writer, casm: casm_writer, deprecated_contract_class: deprecated_contract_class_writer, + thin_transaction_output: transaction_output_writer, + transaction: transaction_writer, }, FileHandlers { thin_state_diff: thin_state_diff_reader, contract_class: contract_class_reader, casm: casm_reader, deprecated_contract_class: deprecated_contract_class_reader, + thin_transaction_output: transaction_output_reader, + transaction: transaction_reader, }, )) } @@ -820,6 +875,10 @@ pub enum OffsetKind { Casm, /// A deprecated contract class file. DeprecatedContractClass, + /// A thin transaction output file. + ThinTransactionOutput, + /// A transaction file. + Transaction, } /// A storage query. Used for benchmarking in the storage_benchmark binary. diff --git a/crates/papyrus_storage/src/serialization/serializers.rs b/crates/papyrus_storage/src/serialization/serializers.rs index 102438f69d..bed3c46dee 100644 --- a/crates/papyrus_storage/src/serialization/serializers.rs +++ b/crates/papyrus_storage/src/serialization/serializers.rs @@ -325,6 +325,8 @@ auto_storage_serde! { ContractClass = 1, Casm = 2, DeprecatedContractClass = 3, + ThinTransactionOutput = 4, + Transaction = 5, } pub struct PaymasterData(pub Vec); pub struct PoseidonHash(pub StarkFelt);