Skip to content

Commit

Permalink
Add new object_per_epoch_marker_table that includes consensus start…
Browse files Browse the repository at this point in the history
… versions

This introduces the concept of a "FullObjectID" and
"FullObjectKey", which for consensus objects includes the
initial shared version/start version.

Migrates to these new types across the codebase where relevant.
  • Loading branch information
aschran committed Jan 9, 2025
1 parent 80955a5 commit 58cb35e
Show file tree
Hide file tree
Showing 45 changed files with 1,737 additions and 220 deletions.
2 changes: 2 additions & 0 deletions crates/simulacrum/src/store/in_mem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ impl ChildObjectResolver for InMemoryStore {
receiving_object_id: &ObjectID,
receive_object_at_version: SequenceNumber,
_epoch_id: EpochId,
// TODO: Delete this parameter once table migration is complete.
_use_object_per_epoch_marker_table_v2: bool,
) -> sui_types::error::SuiResult<Option<Object>> {
let recv_object = match crate::store::SimulatorStore::get_object(self, receiving_object_id)
{
Expand Down
25 changes: 24 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_objects_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
Expand Down Expand Up @@ -1556,7 +1560,14 @@ impl AuthorityState {
inner_temporary_store,
);
self.get_cache_writer()
.write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())
.write_transaction_outputs(
epoch_store.epoch(),
transaction_outputs.into(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;

if certificate.transaction_data().is_end_of_epoch_tx() {
Expand Down Expand Up @@ -1796,6 +1807,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

// make a gas object if one was not provided
Expand Down Expand Up @@ -1982,6 +1997,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

// make a gas object if one was not provided
Expand Down Expand Up @@ -2144,6 +2163,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

// Create and use a dummy gas object if there is no gas object provided.
Expand Down
11 changes: 6 additions & 5 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use sui_storage::mutex_table::{MutexGuard, MutexTable};
use sui_types::accumulator::Accumulator;
use sui_types::authenticator_state::{get_authenticator_state, ActiveJwk};
use sui_types::base_types::{
AuthorityName, ConsensusObjectSequenceKey, EpochId, ObjectID, SequenceNumber, TransactionDigest,
AuthorityName, ConsensusObjectSequenceKey, EpochId, FullObjectID, ObjectID, SequenceNumber,
TransactionDigest,
};
use sui_types::base_types::{ConciseableName, ObjectRef};
use sui_types::committee::Committee;
Expand Down Expand Up @@ -1437,7 +1438,7 @@ impl AuthorityPerEpochStore {
error: "no assigned shared versions".to_string(),
})?;

let initial_shared_version =
let modified_initial_shared_version =
if self.epoch_start_config().use_version_assignment_tables_v3() {
*initial_shared_version
} else {
Expand All @@ -1447,21 +1448,21 @@ impl AuthorityPerEpochStore {
};
// If we found assigned versions, but they are missing the assignment for
// this object, it indicates a serious inconsistency!
let Some(version) = assigned_shared_versions.get(&(*id, initial_shared_version)) else {
let Some(version) = assigned_shared_versions.get(&(*id, modified_initial_shared_version)) else {
panic!(
"Shared object version should have been assigned. key: {key:?}, \
obj id: {id:?}, initial_shared_version: {initial_shared_version:?}, \
assigned_shared_versions: {assigned_shared_versions:?}",
)
};
InputKey::VersionedObject {
id: *id,
id: FullObjectID::new(*id, Some(initial_shared_version)),
version: *version,
}
}
InputObjectKind::MovePackage(id) => InputKey::Package { id: *id },
InputObjectKind::ImmOrOwnedMoveObject(objref) => InputKey::VersionedObject {
id: objref.0,
id: FullObjectID::new(objref.0, None),
version: objref.1,
},
})
Expand Down
118 changes: 85 additions & 33 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use sui_types::error::UserInputError;
use sui_types::execution::TypeLayoutStore;
use sui_types::message_envelope::Message;
use sui_types::storage::{
get_module, BackingPackageStore, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
get_module, BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone,
ObjectStore,
};
use sui_types::sui_system_state::get_sui_system_state;
use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
Expand Down Expand Up @@ -212,9 +213,12 @@ impl AuthorityStore {
// We can safely delete all entries in the per epoch marker table since this is only called
// at epoch boundaries (during reconfiguration). Therefore any entries that currently
// exist can be removed. Because of this we can use the `schedule_delete_all` method.
self.perpetual_tables
.object_per_epoch_marker_table
.schedule_delete_all()?;
Ok(self
.perpetual_tables
.object_per_epoch_marker_table
.object_per_epoch_marker_table_v2
.schedule_delete_all()?)
}

Expand Down Expand Up @@ -412,40 +416,70 @@ impl AuthorityStore {

pub fn get_marker_value(
&self,
object_id: &ObjectID,
version: &SequenceNumber,
object_key: FullObjectKey,
epoch_id: EpochId,
// TODO: Delete this parameter once table migration is complete.
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult<Option<MarkerValue>> {
let object_key = (epoch_id, ObjectKey(*object_id, *version));
Ok(self
.perpetual_tables
.object_per_epoch_marker_table
.get(&object_key)?)
if use_object_per_epoch_marker_table_v2 {
Ok(self
.perpetual_tables
.object_per_epoch_marker_table_v2
.get(&(epoch_id, object_key))?)
} else {
Ok(self
.perpetual_tables
.object_per_epoch_marker_table
.get(&(epoch_id, object_key.into_object_key()))?)
}
}

pub fn get_latest_marker(
&self,
object_id: &ObjectID,
object_id: FullObjectID,
epoch_id: EpochId,
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
let min_key = (epoch_id, ObjectKey::min_for_id(object_id));
let max_key = (epoch_id, ObjectKey::max_for_id(object_id));
if use_object_per_epoch_marker_table_v2 {
let min_key = (epoch_id, FullObjectKey::min_for_id(&object_id));
let max_key = (epoch_id, FullObjectKey::max_for_id(&object_id));

let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
// because of the iterator bounds these cannot fail
assert_eq!(epoch, epoch_id);
assert_eq!(key.0, *object_id);
Ok(Some((key.1, marker)))
let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table_v2
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
// because of the iterator bounds these cannot fail
assert_eq!(epoch, epoch_id);
assert_eq!(key.id(), object_id);
Ok(Some((key.version(), marker)))
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
} else {
let min_key = (epoch_id, ObjectKey::min_for_id(&object_id.id()));
let max_key = (epoch_id, ObjectKey::max_for_id(&object_id.id()));

let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
// because of the iterator bounds these cannot fail
assert_eq!(epoch, epoch_id);
assert_eq!(key.0, object_id.id());
Ok(Some((key.1, marker)))
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
}

Expand Down Expand Up @@ -824,6 +858,8 @@ impl AuthorityStore {
&self,
epoch_id: EpochId,
tx_outputs: &[Arc<TransactionOutputs>],
// TODO: Delete this parameter once table migration is complete.
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult {
let mut written = Vec::with_capacity(tx_outputs.len());
for outputs in tx_outputs {
Expand All @@ -834,7 +870,12 @@ impl AuthorityStore {

let mut write_batch = self.perpetual_tables.transactions.batch();
for outputs in tx_outputs {
self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
self.write_one_transaction_outputs(
&mut write_batch,
epoch_id,
outputs,
use_object_per_epoch_marker_table_v2,
)?;
}
// test crashing before writing the batch
fail_point_async!("crash");
Expand All @@ -859,6 +900,8 @@ impl AuthorityStore {
write_batch: &mut DBBatch,
epoch_id: EpochId,
tx_outputs: &TransactionOutputs,
// TODO: Delete this parameter once table migration is complete.
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult {
let TransactionOutputs {
transaction,
Expand All @@ -883,12 +926,21 @@ impl AuthorityStore {
// Add batched writes for objects and locks.
let effects_digest = effects.digest();

write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
)?;
if use_object_per_epoch_marker_table_v2 {
write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table_v2,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
)?;
} else {
write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, key.into_object_key()), *marker_value)),
)?;
}

write_batch.insert_batch(
&self.perpetual_tables.objects,
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sui_types::accumulator::Accumulator;
use sui_types::base_types::SequenceNumber;
use sui_types::digests::TransactionEventsDigest;
use sui_types::effects::TransactionEffects;
use sui_types::storage::MarkerValue;
use sui_types::storage::{FullObjectKey, MarkerValue};
use typed_store::metrics::SamplingInterval;
use typed_store::rocks::util::{empty_compaction_filter, reference_count_merge_operator};
use typed_store::rocks::{
Expand Down Expand Up @@ -136,6 +136,7 @@ pub struct AuthorityPerpetualTables {
/// objects that have been deleted. This table is meant to be pruned per-epoch, and all
/// previous epochs other than the current epoch may be pruned safely.
pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
}

impl AuthorityPerpetualTables {
Expand Down Expand Up @@ -459,6 +460,7 @@ impl AuthorityPerpetualTables {
self.expected_network_sui_amount.unsafe_clear()?;
self.expected_storage_fund_imbalance.unsafe_clear()?;
self.object_per_epoch_marker_table.unsafe_clear()?;
self.object_per_epoch_marker_table_v2.unsafe_clear()?;
self.objects.rocksdb.flush()?;
Ok(())
}
Expand Down
9 changes: 8 additions & 1 deletion crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,14 @@ impl<'a> TestAuthorityBuilder<'a> {

state
.get_cache_commit()
.commit_transaction_outputs(epoch_store.epoch(), &[*genesis.transaction().digest()])
.commit_transaction_outputs(
epoch_store.epoch(),
&[*genesis.transaction().digest()],
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;

// We want to insert these objects directly instead of relying on genesis because
Expand Down
18 changes: 16 additions & 2 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,14 @@ impl CheckpointExecutor {
let cache_commit = self.state.get_cache_commit();
debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
cache_commit
.commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
.commit_transaction_outputs(
epoch_store.epoch(),
all_tx_digests,
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;

epoch_store
Expand Down Expand Up @@ -657,7 +664,14 @@ impl CheckpointExecutor {

let cache_commit = self.state.get_cache_commit();
cache_commit
.commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
.commit_transaction_outputs(
cur_epoch,
&[change_epoch_tx_digest],
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;
fail_point_async!("prune-and-compact");

Expand Down
Loading

0 comments on commit 58cb35e

Please sign in to comment.