Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validation round number oracle #3175

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions linera-base/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,14 @@ impl Round {
matches!(self, Round::MultiLeader(_))
}

/// Returns the round number if this is a multi-leader round, `None` otherwise.
pub fn multi_leader(&self) -> Option<u32> {
match self {
Round::MultiLeader(number) => Some(*number),
_ => None,
}
}

/// Whether the round is the fast round.
pub fn is_fast(&self) -> bool {
matches!(self, Round::Fast)
Expand Down Expand Up @@ -763,6 +771,8 @@ pub enum OracleResponse {
Blob(BlobId),
/// An assertion oracle that passed.
Assert,
/// The block's validation round.
Round(Option<u32>),
}

impl Display for OracleResponse {
Expand All @@ -774,6 +784,8 @@ impl Display for OracleResponse {
OracleResponse::Post(bytes) => write!(f, "Post:{}", STANDARD_NO_PAD.encode(bytes))?,
OracleResponse::Blob(blob_id) => write!(f, "Blob:{}", blob_id)?,
OracleResponse::Assert => write!(f, "Assert")?,
OracleResponse::Round(Some(round)) => write!(f, "Round:{round}")?,
OracleResponse::Round(None) => write!(f, "Round:None")?,
};

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ where
&mut self,
block: &ProposedBlock,
local_time: Timestamp,
round: Option<u32>,
replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
) -> Result<BlockExecutionOutcome, ChainError> {
#[cfg(with_metrics)]
Expand Down Expand Up @@ -785,6 +786,7 @@ where
posted_message,
incoming_bundle,
block,
round,
txn_index,
local_time,
&mut txn_tracker,
Expand All @@ -803,6 +805,7 @@ where
chain_id,
height: block.height,
index: Some(txn_index),
round,
authenticated_signer: block.authenticated_signer,
authenticated_caller_id: None,
};
Expand Down Expand Up @@ -935,6 +938,7 @@ where
posted_message: &PostedMessage,
incoming_bundle: &IncomingBundle,
block: &ProposedBlock,
round: Option<u32>,
txn_index: u32,
local_time: Timestamp,
txn_tracker: &mut TransactionTracker,
Expand All @@ -946,6 +950,7 @@ where
chain_id: block.chain_id,
is_bouncing: posted_message.is_bouncing(),
height: block.height,
round,
certificate_hash: incoming_bundle.bundle.certificate_hash,
message_id,
authenticated_signer: posted_message.authenticated_signer,
Expand Down
17 changes: 10 additions & 7 deletions linera-chain/src/unit_tests/chain_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ async fn test_block_size_limit() {
recipient: Recipient::root(0),
amount: Amount::ONE,
});
let result = chain.execute_block(&invalid_block, time, None).await;
let result = chain.execute_block(&invalid_block, time, None, None).await;
assert_matches!(
result,
Err(ChainError::ExecutionError(
Expand All @@ -170,7 +170,10 @@ async fn test_block_size_limit() {
);

// The valid block is accepted...
let outcome = chain.execute_block(&valid_block, time, None).await.unwrap();
let outcome = chain
.execute_block(&valid_block, time, None, None)
.await
.unwrap();
let block = Block::new(valid_block, outcome);

// ...because its size is exactly at the allowed limit.
Expand Down Expand Up @@ -231,7 +234,7 @@ async fn test_application_permissions() -> anyhow::Result<()> {
let invalid_block = make_first_block(chain_id)
.with_incoming_bundle(bundle.clone())
.with_simple_transfer(chain_id, Amount::ONE);
let result = chain.execute_block(&invalid_block, time, None).await;
let result = chain.execute_block(&invalid_block, time, None, None).await;
assert_matches!(result, Err(ChainError::AuthorizedApplications(app_ids))
if app_ids == vec![application_id]
);
Expand All @@ -247,7 +250,7 @@ async fn test_application_permissions() -> anyhow::Result<()> {
.with_incoming_bundle(bundle)
.with_operation(app_operation.clone());
let executed_block = chain
.execute_block(&valid_block, time, None)
.execute_block(&valid_block, time, None, None)
.await?
.with(valid_block);
let value = Hashed::new(ConfirmedBlock::new(executed_block));
Expand All @@ -256,14 +259,14 @@ async fn test_application_permissions() -> anyhow::Result<()> {
let invalid_block = make_child_block(&value.clone())
.with_simple_transfer(chain_id, Amount::ONE)
.with_operation(app_operation.clone());
let result = chain.execute_block(&invalid_block, time, None).await;
let result = chain.execute_block(&invalid_block, time, None, None).await;
assert_matches!(result, Err(ChainError::AuthorizedApplications(app_ids))
if app_ids == vec![application_id]
);

// Also, blocks without an application operation or incoming message are forbidden.
let invalid_block = make_child_block(&value.clone());
let result = chain.execute_block(&invalid_block, time, None).await;
let result = chain.execute_block(&invalid_block, time, None, None).await;
assert_matches!(result, Err(ChainError::MissingMandatoryApplications(app_ids))
if app_ids == vec![application_id]
);
Expand All @@ -272,7 +275,7 @@ async fn test_application_permissions() -> anyhow::Result<()> {
application.expect_call(ExpectedCall::execute_operation(|_, _, _| Ok(vec![])));
application.expect_call(ExpectedCall::default_finalize());
let valid_block = make_child_block(&value).with_operation(app_operation);
chain.execute_block(&valid_block, time, None).await?;
chain.execute_block(&valid_block, time, None, None).await?;

Ok(())
}
3 changes: 2 additions & 1 deletion linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,11 +972,12 @@ where
pub async fn stage_block_execution(
&self,
block: ProposedBlock,
round: Option<u32>,
) -> Result<ExecutedBlock, Error> {
Ok(self
.client
.local_node()
.stage_block_execution(block)
.stage_block_execution(block, round)
.await?
.0)
}
Expand Down
9 changes: 7 additions & 2 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ where
/// Execute a block but discard any changes to the chain state.
StageBlockExecution {
block: ProposedBlock,
round: Option<u32>,
#[debug(skip)]
callback: oneshot::Sender<Result<(ExecutedBlock, ChainInfoResponse), WorkerError>>,
},
Expand Down Expand Up @@ -286,8 +287,12 @@ where
} => callback
.send(self.worker.describe_application(application_id).await)
.is_ok(),
ChainWorkerRequest::StageBlockExecution { block, callback } => callback
.send(self.worker.stage_block_execution(block).await)
ChainWorkerRequest::StageBlockExecution {
block,
round,
callback,
} => callback
.send(self.worker.stage_block_execution(block, round).await)
.is_ok(),
ChainWorkerRequest::ProcessTimeout {
certificate,
Expand Down
1 change: 1 addition & 0 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ where
let verified_outcome = Box::pin(self.state.chain.execute_block(
&executed_block.block,
local_time,
None,
Some(executed_block.outcome.oracle_responses.clone()),
))
.await?;
Expand Down
3 changes: 2 additions & 1 deletion linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,11 @@ where
pub(super) async fn stage_block_execution(
&mut self,
block: ProposedBlock,
round: Option<u32>,
) -> Result<(ExecutedBlock, ChainInfoResponse), WorkerError> {
ChainWorkerStateWithTemporaryChanges::new(self)
.await
.stage_block_execution(block)
.stage_block_execution(block, round)
.await
}

Expand Down
16 changes: 11 additions & 5 deletions linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,15 @@ where
/// Executes a block without persisting any changes to the state.
pub(super) async fn stage_block_execution(
&mut self,
proposal: ProposedBlock,
block: ProposedBlock,
round: Option<u32>,
) -> Result<(ExecutedBlock, ChainInfoResponse), WorkerError> {
let local_time = self.0.storage.clock().current_time();
let signer = proposal.authenticated_signer;
let signer = block.authenticated_signer;

let executed_block = Box::pin(self.0.chain.execute_block(&proposal, local_time, None))
let executed_block = Box::pin(self.0.chain.execute_block(&block, local_time, round, None))
.await?
.with(proposal);
.with(block);

let mut response = ChainInfoResponse::new(&self.0.chain, None);
if let Some(signer) = signer {
Expand Down Expand Up @@ -238,7 +239,12 @@ where
let outcome = if let Some(outcome) = outcome {
outcome.clone()
} else {
Box::pin(self.0.chain.execute_block(block, local_time, None)).await?
Box::pin(
self.0
.chain
.execute_block(block, local_time, round.multi_leader(), None),
)
.await?
};

let executed_block = outcome.with(block.clone());
Expand Down
46 changes: 34 additions & 12 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1903,9 +1903,10 @@ where
async fn stage_block_execution_and_discard_failing_messages(
&self,
mut block: ProposedBlock,
round: Option<u32>,
) -> Result<(ExecutedBlock, ChainInfoResponse), ChainClientError> {
loop {
let result = self.stage_block_execution(block.clone()).await;
let result = self.stage_block_execution(block.clone(), round).await;
if let Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
WorkerError::ChainError(chain_error),
))) = &result
Expand Down Expand Up @@ -1944,12 +1945,13 @@ where
async fn stage_block_execution(
&self,
block: ProposedBlock,
round: Option<u32>,
) -> Result<(ExecutedBlock, ChainInfoResponse), ChainClientError> {
loop {
let result = self
.client
.local_node
.stage_block_execution(block.clone())
.stage_block_execution(block.clone(), round)
.await;
if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
self.receive_certificates_for_blobs(blob_ids.clone())
Expand Down Expand Up @@ -2122,8 +2124,18 @@ where
};
// Make sure every incoming message succeeds and otherwise remove them.
// Also, compute the final certified hash while we're at it.

let info = self.chain_info().await?;
// Use the round number assuming there are oracle responses.
// Using the round number during execution counts as an oracle.
// Accessing the round number in single-leader rounds where we are not the leader
// is not currently supported.
let round = match Self::round_for_new_proposal(&info, &identity, &block, true)? {
Either::Left(round) => round.multi_leader(),
Either::Right(_) => None,
};
let (executed_block, _) = self
.stage_block_execution_and_discard_failing_messages(block)
.stage_block_execution_and_discard_failing_messages(block, round)
.await?;
let blobs = self
.read_local_blobs(executed_block.required_blob_ids())
Expand Down Expand Up @@ -2270,7 +2282,7 @@ where
timestamp,
};
match self
.stage_block_execution_and_discard_failing_messages(block)
.stage_block_execution_and_discard_failing_messages(block, None)
.await
{
Ok((_, response)) => Ok((
Expand Down Expand Up @@ -2424,6 +2436,7 @@ where
{
return self.finalize_locking_block(info).await;
}
let identity = self.identity().await?;

// Otherwise we have to re-propose the highest validated block, if there is one.
let pending: Option<ProposedBlock> = self.state().pending_proposal().clone();
Expand All @@ -2432,18 +2445,28 @@ where
LockingBlock::Regular(certificate) => certificate.block().clone().into(),
LockingBlock::Fast(proposal) => {
let block = proposal.content.block.clone();
self.stage_block_execution(block).await?.0
self.stage_block_execution(block, None).await?.0
}
}
} else if let Some(block) = pending {
// Otherwise we are free to propose our own pending block.
self.stage_block_execution(block).await?.0
// Use the round number assuming there are oracle responses.
// Using the round number during execution counts as an oracle.
let round = match Self::round_for_new_proposal(&info, &identity, &block, true)? {
Either::Left(round) => round.multi_leader(),
Either::Right(_) => None,
};
self.stage_block_execution(block, round).await?.0
} else {
return Ok(ClientOutcome::Committed(None)); // Nothing to do.
};

let identity = self.identity().await?;
let round = match Self::round_for_new_proposal(&info, &identity, &executed_block)? {
let round = match Self::round_for_new_proposal(
&info,
&identity,
&executed_block.block,
executed_block.outcome.has_oracle_responses(),
)? {
Either::Left(round) => round,
Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
};
Expand Down Expand Up @@ -2546,17 +2569,16 @@ where
fn round_for_new_proposal(
info: &ChainInfo,
identity: &Owner,
executed_block: &ExecutedBlock,
block: &ProposedBlock,
has_oracle_responses: bool,
) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
let manager = &info.manager;
let block = &executed_block.block;
// If there is a conflicting proposal in the current round, we can only propose if the
// next round can be started without a timeout, i.e. if we are in a multi-leader round.
// Similarly, we cannot propose a block that uses oracles in the fast round.
let conflict = manager.requested_proposed.as_ref().is_some_and(|proposal| {
proposal.content.round == manager.current_round && proposal.content.block != *block
}) || (manager.current_round.is_fast()
&& executed_block.outcome.has_oracle_responses());
}) || (manager.current_round.is_fast() && has_oracle_responses);
let round = if !conflict {
manager.current_round
} else if let Some(round) = manager
Expand Down
3 changes: 2 additions & 1 deletion linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ where
pub async fn stage_block_execution(
&self,
block: ProposedBlock,
round: Option<u32>,
) -> Result<(ExecutedBlock, ChainInfoResponse), LocalNodeError> {
Ok(self.node.state.stage_block_execution(block).await?)
Ok(self.node.state.stage_block_execution(block, round).await?)
}

/// Reads blobs from storage.
Expand Down
1 change: 1 addition & 0 deletions linera-core/src/unit_tests/wasm_worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ where
authenticated_signer: None,
authenticated_caller_id: None,
height: run_block.height,
round: Some(0),
index: Some(0),
};
let mut controller = ResourceController::default();
Expand Down
Loading
Loading