Skip to content

Commit

Permalink
Don't forget pending blobs if retrying at the next height. (#3238)
Browse files Browse the repository at this point in the history
## Motivation

When a conflicting block is committed, the pending blobs are cleared, so
if the client retries at the next height, it cannot find them anymore.

## Proposal

Always set the pending blobs together with the pending block.

## Test Plan

`test_re_propose_locked_block_with_blobs` was modified to re-propose a
`PublishDataBlob` operation. This fails for me without the fix.

An assertion was added that the pending blobs match the pending block.
This would fail if the blobs were missing.

## Release Plan

- These changes should be ported to the latest `main` branch.
- It could be released in a new SDK patch version.

## Links

- Possibly related: #3029
- [reviewer
checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
  • Loading branch information
afck authored Feb 4, 2025
1 parent 2cbdd05 commit ec049d2
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 57 deletions.
4 changes: 2 additions & 2 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ where
.take(num_new_chains)
.collect();
let certificate = chain_client
.execute_without_prepare(operations)
.execute_without_prepare(operations, Vec::new())
.await?
.expect("should execute block with OpenChain operations");
let executed_block = certificate
Expand Down Expand Up @@ -634,7 +634,7 @@ where
// Put at most 1000 fungible token operations in each block.
for operations in operations.chunks(1000) {
chain_client
.execute_without_prepare(operations.to_vec())
.execute_without_prepare(operations.to_vec(), Vec::new())
.await?
.expect("should execute block with OpenChain operations");
}
Expand Down
20 changes: 16 additions & 4 deletions linera-core/src/client/chain_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};

Expand Down Expand Up @@ -67,12 +67,12 @@ impl ChainState {
timestamp,
next_block_height,
pending_block: None,
pending_blobs,
pending_blobs: BTreeMap::new(),
received_certificate_trackers: HashMap::new(),
client_mutex: Arc::default(),
};
if let Some(block) = pending_block {
state.set_pending_block(block);
state.set_pending_block(block, pending_blobs.into_values());
}
state
}
Expand All @@ -93,8 +93,20 @@ impl ChainState {
&self.pending_block
}

pub(super) fn set_pending_block(&mut self, block: Block) {
pub(super) fn set_pending_block(
&mut self,
block: Block,
blobs: impl IntoIterator<Item = Blob>,
) {
if block.height == self.next_block_height {
self.pending_blobs.clear();
for blob in blobs {
self.insert_pending_blob(blob);
}
assert_eq!(
block.published_blob_ids(),
self.pending_blobs.keys().copied().collect::<HashSet<_>>()
);
self.pending_block = Some(block);
} else {
tracing::error!(
Expand Down
79 changes: 44 additions & 35 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2029,13 +2029,18 @@ where
// Create the final block proposal.
let key_pair = self.key_pair().await?;
let proposal = if let Some(cert) = manager.requested_locked {
Box::new(BlockProposal::new_retry(round, *cert, &key_pair, blobs))
Box::new(BlockProposal::new_retry(
round,
*cert,
&key_pair,
blobs.clone(),
))
} else {
Box::new(BlockProposal::new_initial(
round,
block.clone(),
&key_pair,
blobs,
blobs.clone(),
))
};
// Check the final block proposal. This will be cheaper after #1401.
Expand All @@ -2044,7 +2049,7 @@ where
.handle_block_proposal(*proposal.clone())
.await?;
// Remember what we are trying to do before sending the proposal to the validators.
self.state_mut().set_pending_block(block);
self.state_mut().set_pending_block(block, blobs);
// Send the query to validators.
let certificate = self
.submit_block_proposal(&committee, proposal, hashed_value)
Expand Down Expand Up @@ -2076,24 +2081,26 @@ where
}

/// Executes a list of operations.
#[instrument(level = "trace", skip(operations))]
#[instrument(level = "trace", skip(operations, blobs))]
pub async fn execute_operations(
&self,
operations: Vec<Operation>,
blobs: Vec<Blob>,
) -> Result<ClientOutcome<Certificate>, ChainClientError> {
self.prepare_chain().await?;
self.execute_without_prepare(operations).await
self.execute_without_prepare(operations, blobs).await
}

/// Executes a list of operations, without calling `prepare_chain`.
#[instrument(level = "trace", skip(operations))]
#[instrument(level = "trace", skip(operations, blobs))]
pub async fn execute_without_prepare(
&self,
operations: Vec<Operation>,
blobs: Vec<Blob>,
) -> Result<ClientOutcome<Certificate>, ChainClientError> {
loop {
// TODO(#2066): Remove boxing once the call-stack is shallower
match Box::pin(self.execute_block(operations.clone())).await? {
match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await? {
ExecuteBlockOutcome::Executed(certificate) => {
return Ok(ClientOutcome::Committed(certificate));
}
Expand All @@ -2116,16 +2123,17 @@ where
&self,
operation: Operation,
) -> Result<ClientOutcome<Certificate>, ChainClientError> {
self.execute_operations(vec![operation]).await
self.execute_operations(vec![operation], vec![]).await
}

/// Executes a new block.
///
/// This must be preceded by a call to `prepare_chain()`.
#[instrument(level = "trace", skip(operations))]
#[instrument(level = "trace", skip(operations, blobs))]
async fn execute_block(
&self,
operations: Vec<Operation>,
blobs: Vec<Blob>,
) -> Result<ExecuteBlockOutcome, ChainClientError> {
#[cfg(with_metrics)]
let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
Expand All @@ -2142,7 +2150,9 @@ where
ClientOutcome::Committed(None) => {}
}
let incoming_bundles = self.pending_message_bundles().await?;
let confirmed_value = self.set_pending_block(incoming_bundles, operations).await?;
let confirmed_value = self
.set_pending_block(incoming_bundles, operations, blobs)
.await?;
match self.process_pending_block_without_prepare().await? {
ClientOutcome::Committed(Some(certificate))
if certificate.value().block() == confirmed_value.inner().block() =>
Expand All @@ -2164,11 +2174,12 @@ where

/// Sets the pending block, so that next time `process_pending_block_without_prepare` is
/// called, it will be proposed to the validators.
#[instrument(level = "trace", skip(incoming_bundles, operations))]
#[instrument(level = "trace", skip(incoming_bundles, operations, blobs))]
async fn set_pending_block(
&self,
incoming_bundles: Vec<IncomingBundle>,
operations: Vec<Operation>,
blobs: Vec<Blob>,
) -> Result<HashedCertificateValue, ChainClientError> {
let identity = self.identity().await?;
let (previous_block_hash, height, timestamp) = {
Expand All @@ -2195,7 +2206,7 @@ where
.stage_block_execution_and_discard_failing_messages(block)
.await?;
self.state_mut()
.set_pending_block(executed_block.block.clone());
.set_pending_block(executed_block.block.clone(), blobs);
Ok(HashedCertificateValue::new_confirmed(executed_block))
}

Expand Down Expand Up @@ -2655,7 +2666,7 @@ where
multi_leader_rounds: ownership.multi_leader_rounds,
timeout_config: ownership.timeout_config,
})];
match self.execute_block(operations).await? {
match self.execute_block(operations, Vec::new()).await? {
ExecuteBlockOutcome::Executed(certificate) => {
return Ok(ClientOutcome::Committed(certificate));
}
Expand Down Expand Up @@ -2719,7 +2730,7 @@ where
application_permissions: application_permissions.clone(),
};
let operation = Operation::System(SystemOperation::OpenChain(config));
let certificate = match self.execute_block(vec![operation]).await? {
let certificate = match self.execute_block(vec![operation], Vec::new()).await? {
ExecuteBlockOutcome::Executed(certificate) => certificate,
ExecuteBlockOutcome::Conflict(_) => continue,
ExecuteBlockOutcome::WaitForTimeout(timeout) => {
Expand Down Expand Up @@ -2774,10 +2785,12 @@ where
service_blob: Blob,
bytecode_id: BytecodeId,
) -> Result<ClientOutcome<(BytecodeId, Certificate)>, ChainClientError> {
self.add_pending_blobs([contract_blob, service_blob]).await;
self.execute_operation(Operation::System(SystemOperation::PublishBytecode {
bytecode_id,
}))
self.execute_operations(
vec![Operation::System(SystemOperation::PublishBytecode {
bytecode_id,
})],
vec![contract_blob, service_blob],
)
.await?
.try_map(|certificate| Ok((bytecode_id, certificate)))
}
Expand All @@ -2797,8 +2810,8 @@ where
})
})
.collect();
self.add_pending_blobs(blobs).await;
self.execute_operations(publish_blob_operations).await
self.execute_operations(publish_blob_operations, blobs.collect())
.await
}

/// Publishes some data blob.
Expand All @@ -2810,13 +2823,6 @@ where
self.publish_data_blobs(vec![bytes]).await
}

/// Adds pending blobs
pub async fn add_pending_blobs(&self, pending_blobs: impl IntoIterator<Item = Blob>) {
for blob in pending_blobs {
self.state_mut().insert_pending_blob(blob);
}
}

/// Creates an application by instantiating some bytecode.
#[instrument(
level = "trace",
Expand Down Expand Up @@ -2898,12 +2904,15 @@ where
self.prepare_chain().await?;
let epoch = self.epoch().await?;
match self
.execute_block(vec![Operation::System(SystemOperation::Admin(
AdminOperation::CreateCommittee {
epoch: epoch.try_add_one()?,
committee: committee.clone(),
},
))])
.execute_block(
vec![Operation::System(SystemOperation::Admin(
AdminOperation::CreateCommittee {
epoch: epoch.try_add_one()?,
committee: committee.clone(),
},
))],
Vec::new(),
)
.await?
{
ExecuteBlockOutcome::Executed(certificate) => {
Expand Down Expand Up @@ -2948,7 +2957,7 @@ where
if incoming_bundles.is_empty() {
return Ok((certificates, None));
}
match self.execute_block(vec![]).await {
match self.execute_block(Vec::new(), Vec::new()).await {
Ok(ExecuteBlockOutcome::Executed(certificate))
| Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
Expand Down Expand Up @@ -3006,7 +3015,7 @@ where
}
})
.collect();
self.execute_without_prepare(operations).await
self.execute_without_prepare(operations, Vec::new()).await
}

/// Sends money to a chain.
Expand Down
24 changes: 10 additions & 14 deletions linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ where

// Since blocks are free of charge on closed chains, empty blocks are not allowed.
assert_matches!(
client1.execute_operations(vec![]).await,
client1.execute_operations(vec![], vec![]).await,
Err(ChainClientError::LocalNodeError(
LocalNodeError::WorkerError(WorkerError::ChainError(error))
)) if matches!(*error, ChainError::ClosedChain)
Expand Down Expand Up @@ -1473,15 +1473,14 @@ where
let blob1 = Blob::new_data(b"blob1".to_vec());
let blob1_hash = blob1.id().hash;

client2_a.add_pending_blobs([blob1]).await;
let blob_0_1_operations = vec![
Operation::System(SystemOperation::ReadBlob { blob_id: blob0_id }),
Operation::System(SystemOperation::PublishDataBlob {
blob_hash: blob1_hash,
}),
];
let b0_result = client2_a
.execute_operations(blob_0_1_operations.clone())
.execute_operations(blob_0_1_operations.clone(), vec![blob1])
.await;

assert!(b0_result.is_err());
Expand Down Expand Up @@ -1617,15 +1616,14 @@ where
let blob1 = Blob::new_data(b"blob1".to_vec());
let blob1_hash = blob1.id().hash;

client2_a.add_pending_blobs([blob1]).await;
let blob_0_1_operations = vec![
Operation::System(SystemOperation::ReadBlob { blob_id: blob0_id }),
Operation::System(SystemOperation::PublishDataBlob {
blob_hash: blob1_hash,
}),
];
let b0_result = client2_a
.execute_operations(blob_0_1_operations.clone())
.execute_operations(blob_0_1_operations.clone(), vec![blob1])
.await;

assert!(b0_result.is_err());
Expand Down Expand Up @@ -1794,15 +1792,14 @@ where
let blob1 = Blob::new_data(b"blob1".to_vec());
let blob1_hash = blob1.id().hash;

client3_a.add_pending_blobs([blob1]).await;
let blob_0_1_operations = vec![
Operation::System(SystemOperation::ReadBlob { blob_id: blob0_id }),
Operation::System(SystemOperation::PublishDataBlob {
blob_hash: blob1_hash,
}),
];
let b0_result = client3_a
.execute_operations(blob_0_1_operations.clone())
.execute_operations(blob_0_1_operations.clone(), vec![blob1])
.await;

assert!(b0_result.is_err());
Expand Down Expand Up @@ -1869,15 +1866,14 @@ where
let blob3 = Blob::new_data(b"blob3".to_vec());
let blob3_hash = blob3.id().hash;

client3_b.add_pending_blobs([blob3]).await;
let blob_2_3_operations = vec![
Operation::System(SystemOperation::ReadBlob { blob_id: blob2_id }),
Operation::System(SystemOperation::PublishDataBlob {
blob_hash: blob3_hash,
}),
];
let b1_result = client3_b
.execute_operations(blob_2_3_operations.clone())
.execute_operations(blob_2_3_operations.clone(), vec![blob3])
.await;
assert!(b1_result.is_err());

Expand Down Expand Up @@ -1930,8 +1926,10 @@ where
builder.set_fault_type([0, 2, 3], FaultType::Honest).await;

client3_c.synchronize_from_validators().await.unwrap();
let blob4_data = b"blob4".to_vec();
let blob4 = Blob::from(BlobContent::new_data(blob4_data.clone()));
let bt_certificate = client3_c
.burn(None, Amount::from_tokens(1))
.publish_data_blob(blob4_data)
.await
.unwrap()
.unwrap();
Expand All @@ -1947,10 +1945,8 @@ where
.block()
.unwrap()
.operations
.contains(&Operation::System(SystemOperation::Transfer {
owner: None,
recipient: Recipient::Burn,
amount: Amount::from_tokens(1),
.contains(&Operation::System(SystemOperation::PublishDataBlob {
blob_hash: blob4.id().hash
})));

// Block before that should be b1
Expand Down
5 changes: 4 additions & 1 deletion linera-core/src/unit_tests/wasm_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,10 @@ async fn test_memory_fuel_limit(wasm_runtime: WasmRuntime) -> anyhow::Result<()>
.unwrap();

assert!(publisher
.execute_operations(vec![Operation::user(application_id, &increment)?; 10])
.execute_operations(
vec![Operation::user(application_id, &increment)?; 10],
vec![]
)
.await
.is_err());

Expand Down
Loading

0 comments on commit ec049d2

Please sign in to comment.