diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index 0f846298e0d..cd2e992392a 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -562,7 +562,7 @@ where .take(num_new_chains) .collect(); let certificate = chain_client - .execute_without_prepare(operations) + .execute_without_prepare(operations, Vec::new()) .await? .expect("should execute block with OpenChain operations"); let executed_block = certificate @@ -634,7 +634,7 @@ where // Put at most 1000 fungible token operations in each block. for operations in operations.chunks(1000) { chain_client - .execute_without_prepare(operations.to_vec()) + .execute_without_prepare(operations.to_vec(), Vec::new()) .await? .expect("should execute block with OpenChain operations"); } diff --git a/linera-core/src/client/chain_state.rs b/linera-core/src/client/chain_state.rs index 5392af48eee..07871100fb9 100644 --- a/linera-core/src/client/chain_state.rs +++ b/linera-core/src/client/chain_state.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, sync::Arc, }; @@ -67,12 +67,12 @@ impl ChainState { timestamp, next_block_height, pending_block: None, - pending_blobs, + pending_blobs: BTreeMap::new(), received_certificate_trackers: HashMap::new(), client_mutex: Arc::default(), }; if let Some(block) = pending_block { - state.set_pending_block(block); + state.set_pending_block(block, pending_blobs.into_values()); } state } @@ -93,8 +93,20 @@ impl ChainState { &self.pending_block } - pub(super) fn set_pending_block(&mut self, block: Block) { + pub(super) fn set_pending_block( + &mut self, + block: Block, + blobs: impl IntoIterator, + ) { if block.height == self.next_block_height { + self.pending_blobs.clear(); + for blob in blobs { + self.insert_pending_blob(blob); + } + assert_eq!( + block.published_blob_ids(), + self.pending_blobs.keys().copied().collect::>() + ); self.pending_block = Some(block); } else { tracing::error!( diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index 21ec2cccb6d..3f9cd5dab9d 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -2029,13 +2029,18 @@ where // Create the final block proposal. let key_pair = self.key_pair().await?; let proposal = if let Some(cert) = manager.requested_locked { - Box::new(BlockProposal::new_retry(round, *cert, &key_pair, blobs)) + Box::new(BlockProposal::new_retry( + round, + *cert, + &key_pair, + blobs.clone(), + )) } else { Box::new(BlockProposal::new_initial( round, block.clone(), &key_pair, - blobs, + blobs.clone(), )) }; // Check the final block proposal. This will be cheaper after #1401. @@ -2044,7 +2049,7 @@ where .handle_block_proposal(*proposal.clone()) .await?; // Remember what we are trying to do before sending the proposal to the validators. - self.state_mut().set_pending_block(block); + self.state_mut().set_pending_block(block, blobs); // Send the query to validators. let certificate = self .submit_block_proposal(&committee, proposal, hashed_value) @@ -2076,24 +2081,26 @@ where } /// Executes a list of operations. - #[instrument(level = "trace", skip(operations))] + #[instrument(level = "trace", skip(operations, blobs))] pub async fn execute_operations( &self, operations: Vec, + blobs: Vec, ) -> Result, ChainClientError> { self.prepare_chain().await?; - self.execute_without_prepare(operations).await + self.execute_without_prepare(operations, blobs).await } /// Executes a list of operations, without calling `prepare_chain`. - #[instrument(level = "trace", skip(operations))] + #[instrument(level = "trace", skip(operations, blobs))] pub async fn execute_without_prepare( &self, operations: Vec, + blobs: Vec, ) -> Result, ChainClientError> { loop { // TODO(#2066): Remove boxing once the call-stack is shallower - match Box::pin(self.execute_block(operations.clone())).await? { + match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await? { ExecuteBlockOutcome::Executed(certificate) => { return Ok(ClientOutcome::Committed(certificate)); } @@ -2116,16 +2123,17 @@ where &self, operation: Operation, ) -> Result, ChainClientError> { - self.execute_operations(vec![operation]).await + self.execute_operations(vec![operation], vec![]).await } /// Executes a new block. /// /// This must be preceded by a call to `prepare_chain()`. - #[instrument(level = "trace", skip(operations))] + #[instrument(level = "trace", skip(operations, blobs))] async fn execute_block( &self, operations: Vec, + blobs: Vec, ) -> Result { #[cfg(with_metrics)] let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency(); @@ -2142,7 +2150,9 @@ where ClientOutcome::Committed(None) => {} } let incoming_bundles = self.pending_message_bundles().await?; - let confirmed_value = self.set_pending_block(incoming_bundles, operations).await?; + let confirmed_value = self + .set_pending_block(incoming_bundles, operations, blobs) + .await?; match self.process_pending_block_without_prepare().await? { ClientOutcome::Committed(Some(certificate)) if certificate.value().block() == confirmed_value.inner().block() => @@ -2164,11 +2174,12 @@ where /// Sets the pending block, so that next time `process_pending_block_without_prepare` is /// called, it will be proposed to the validators. - #[instrument(level = "trace", skip(incoming_bundles, operations))] + #[instrument(level = "trace", skip(incoming_bundles, operations, blobs))] async fn set_pending_block( &self, incoming_bundles: Vec, operations: Vec, + blobs: Vec, ) -> Result { let identity = self.identity().await?; let (previous_block_hash, height, timestamp) = { @@ -2195,7 +2206,7 @@ where .stage_block_execution_and_discard_failing_messages(block) .await?; self.state_mut() - .set_pending_block(executed_block.block.clone()); + .set_pending_block(executed_block.block.clone(), blobs); Ok(HashedCertificateValue::new_confirmed(executed_block)) } @@ -2655,7 +2666,7 @@ where multi_leader_rounds: ownership.multi_leader_rounds, timeout_config: ownership.timeout_config, })]; - match self.execute_block(operations).await? { + match self.execute_block(operations, Vec::new()).await? { ExecuteBlockOutcome::Executed(certificate) => { return Ok(ClientOutcome::Committed(certificate)); } @@ -2719,7 +2730,7 @@ where application_permissions: application_permissions.clone(), }; let operation = Operation::System(SystemOperation::OpenChain(config)); - let certificate = match self.execute_block(vec![operation]).await? { + let certificate = match self.execute_block(vec![operation], Vec::new()).await? { ExecuteBlockOutcome::Executed(certificate) => certificate, ExecuteBlockOutcome::Conflict(_) => continue, ExecuteBlockOutcome::WaitForTimeout(timeout) => { @@ -2774,10 +2785,12 @@ where service_blob: Blob, bytecode_id: BytecodeId, ) -> Result, ChainClientError> { - self.add_pending_blobs([contract_blob, service_blob]).await; - self.execute_operation(Operation::System(SystemOperation::PublishBytecode { - bytecode_id, - })) + self.execute_operations( + vec![Operation::System(SystemOperation::PublishBytecode { + bytecode_id, + })], + vec![contract_blob, service_blob], + ) .await? .try_map(|certificate| Ok((bytecode_id, certificate))) } @@ -2797,8 +2810,8 @@ where }) }) .collect(); - self.add_pending_blobs(blobs).await; - self.execute_operations(publish_blob_operations).await + self.execute_operations(publish_blob_operations, blobs.collect()) + .await } /// Publishes some data blob. @@ -2810,13 +2823,6 @@ where self.publish_data_blobs(vec![bytes]).await } - /// Adds pending blobs - pub async fn add_pending_blobs(&self, pending_blobs: impl IntoIterator) { - for blob in pending_blobs { - self.state_mut().insert_pending_blob(blob); - } - } - /// Creates an application by instantiating some bytecode. #[instrument( level = "trace", @@ -2898,12 +2904,15 @@ where self.prepare_chain().await?; let epoch = self.epoch().await?; match self - .execute_block(vec![Operation::System(SystemOperation::Admin( - AdminOperation::CreateCommittee { - epoch: epoch.try_add_one()?, - committee: committee.clone(), - }, - ))]) + .execute_block( + vec![Operation::System(SystemOperation::Admin( + AdminOperation::CreateCommittee { + epoch: epoch.try_add_one()?, + committee: committee.clone(), + }, + ))], + Vec::new(), + ) .await? { ExecuteBlockOutcome::Executed(certificate) => { @@ -2948,7 +2957,7 @@ where if incoming_bundles.is_empty() { return Ok((certificates, None)); } - match self.execute_block(vec![]).await { + match self.execute_block(Vec::new(), Vec::new()).await { Ok(ExecuteBlockOutcome::Executed(certificate)) | Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate), Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => { @@ -3006,7 +3015,7 @@ where } }) .collect(); - self.execute_without_prepare(operations).await + self.execute_without_prepare(operations, Vec::new()).await } /// Sends money to a chain. diff --git a/linera-core/src/unit_tests/client_tests.rs b/linera-core/src/unit_tests/client_tests.rs index e27e3a6e826..ce0d34cb572 100644 --- a/linera-core/src/unit_tests/client_tests.rs +++ b/linera-core/src/unit_tests/client_tests.rs @@ -881,7 +881,7 @@ where // Since blocks are free of charge on closed chains, empty blocks are not allowed. assert_matches!( - client1.execute_operations(vec![]).await, + client1.execute_operations(vec![], vec![]).await, Err(ChainClientError::LocalNodeError( LocalNodeError::WorkerError(WorkerError::ChainError(error)) )) if matches!(*error, ChainError::ClosedChain) @@ -1473,7 +1473,6 @@ where let blob1 = Blob::new_data(b"blob1".to_vec()); let blob1_hash = blob1.id().hash; - client2_a.add_pending_blobs([blob1]).await; let blob_0_1_operations = vec![ Operation::System(SystemOperation::ReadBlob { blob_id: blob0_id }), Operation::System(SystemOperation::PublishDataBlob { @@ -1481,7 +1480,7 @@ where }), ]; let b0_result = client2_a - .execute_operations(blob_0_1_operations.clone()) + .execute_operations(blob_0_1_operations.clone(), vec![blob1]) .await; assert!(b0_result.is_err()); @@ -1617,7 +1616,6 @@ where let blob1 = Blob::new_data(b"blob1".to_vec()); let blob1_hash = blob1.id().hash; - client2_a.add_pending_blobs([blob1]).await; let blob_0_1_operations = vec![ Operation::System(SystemOperation::ReadBlob { blob_id: blob0_id }), Operation::System(SystemOperation::PublishDataBlob { @@ -1625,7 +1623,7 @@ where }), ]; let b0_result = client2_a - .execute_operations(blob_0_1_operations.clone()) + .execute_operations(blob_0_1_operations.clone(), vec![blob1]) .await; assert!(b0_result.is_err()); @@ -1794,7 +1792,6 @@ where let blob1 = Blob::new_data(b"blob1".to_vec()); let blob1_hash = blob1.id().hash; - client3_a.add_pending_blobs([blob1]).await; let blob_0_1_operations = vec![ Operation::System(SystemOperation::ReadBlob { blob_id: blob0_id }), Operation::System(SystemOperation::PublishDataBlob { @@ -1802,7 +1799,7 @@ where }), ]; let b0_result = client3_a - .execute_operations(blob_0_1_operations.clone()) + .execute_operations(blob_0_1_operations.clone(), vec![blob1]) .await; assert!(b0_result.is_err()); @@ -1869,7 +1866,6 @@ where let blob3 = Blob::new_data(b"blob3".to_vec()); let blob3_hash = blob3.id().hash; - client3_b.add_pending_blobs([blob3]).await; let blob_2_3_operations = vec![ Operation::System(SystemOperation::ReadBlob { blob_id: blob2_id }), Operation::System(SystemOperation::PublishDataBlob { @@ -1877,7 +1873,7 @@ where }), ]; let b1_result = client3_b - .execute_operations(blob_2_3_operations.clone()) + .execute_operations(blob_2_3_operations.clone(), vec![blob3]) .await; assert!(b1_result.is_err()); @@ -1930,8 +1926,10 @@ where builder.set_fault_type([0, 2, 3], FaultType::Honest).await; client3_c.synchronize_from_validators().await.unwrap(); + let blob4_data = b"blob4".to_vec(); + let blob4 = Blob::from(BlobContent::new_data(blob4_data.clone())); let bt_certificate = client3_c - .burn(None, Amount::from_tokens(1)) + .publish_data_blob(blob4_data) .await .unwrap() .unwrap(); @@ -1947,10 +1945,8 @@ where .block() .unwrap() .operations - .contains(&Operation::System(SystemOperation::Transfer { - owner: None, - recipient: Recipient::Burn, - amount: Amount::from_tokens(1), + .contains(&Operation::System(SystemOperation::PublishDataBlob { + blob_hash: blob4.id().hash }))); // Block before that should be b1 diff --git a/linera-core/src/unit_tests/wasm_client_tests.rs b/linera-core/src/unit_tests/wasm_client_tests.rs index 9d50a1e5c51..b265c573099 100644 --- a/linera-core/src/unit_tests/wasm_client_tests.rs +++ b/linera-core/src/unit_tests/wasm_client_tests.rs @@ -929,7 +929,10 @@ async fn test_memory_fuel_limit(wasm_runtime: WasmRuntime) -> anyhow::Result<()> .unwrap(); assert!(publisher - .execute_operations(vec![Operation::user(application_id, &increment)?; 10]) + .execute_operations( + vec![Operation::user(application_id, &increment)?; 10], + vec![] + ) .await .is_err()); diff --git a/linera-service/src/node_service.rs b/linera-service/src/node_service.rs index db3ed1b0ed6..82c237d585d 100644 --- a/linera-service/src/node_service.rs +++ b/linera-service/src/node_service.rs @@ -1063,7 +1063,10 @@ where chain_id: chain_id.to_string(), })?; let hash = loop { - let timeout = match client.execute_operations(operations.clone()).await? { + let timeout = match client + .execute_operations(operations.clone(), Vec::new()) + .await? + { ClientOutcome::Committed(certificate) => break certificate.value.hash(), ClientOutcome::WaitForTimeout(timeout) => timeout, };