diff --git a/ci_checks.sh b/ci_checks.sh index d30a1c43ee..240a5f44de 100755 --- a/ci_checks.sh +++ b/ci_checks.sh @@ -29,7 +29,7 @@ cargo check -p fuel-core-chain-config --target wasm32-unknown-unknown --no-defau cargo check -p fuel-core-executor --target wasm32-unknown-unknown --no-default-features --features alloc && cargo make check --all-features --locked && cargo make check --locked && -OVERRIDE_CHAIN_CONFIGS=true cargo test --test integration_tests local_node && +cargo test --test integration_tests local_node && cargo nextest run --workspace && cargo nextest run --all-features --workspace && cargo nextest run -p fuel-core --no-default-features && diff --git a/crates/services/shared-sequencer/src/lib.rs b/crates/services/shared-sequencer/src/lib.rs index ee42ed5816..3fddcf7b31 100644 --- a/crates/services/shared-sequencer/src/lib.rs +++ b/crates/services/shared-sequencer/src/lib.rs @@ -46,7 +46,7 @@ pub mod service; pub struct Client { endpoints: Endpoints, topic: [u8; 32], - chain_id: Id, + ss_chain_id: Id, gas_price: u128, coin_denom: Denom, account_prefix: String, @@ -61,7 +61,7 @@ impl Client { .map_err(|e| anyhow::anyhow!("{e:?}"))?; let account_prefix = http_api::get_account_prefix(&endpoints.blockchain_rest_api).await?; - let chain_id = http_api::chain_id(&endpoints.blockchain_rest_api) + let ss_chain_id = http_api::chain_id(&endpoints.blockchain_rest_api) .await? .parse() .map_err(|e| anyhow::anyhow!("{e:?}"))?; @@ -81,7 +81,7 @@ impl Client { endpoints, account_prefix, coin_denom, - chain_id, + ss_chain_id, gas_price, }) } @@ -165,7 +165,7 @@ impl Client { topic: [u8; 32], data: Bytes, ) -> anyhow::Result<()> { - // We want to estimate the transaction to know hat amount and fee to use. + // We want to estimate the transaction to know what amount and fee to use. // We use a dummy amount and fee to estimate the gas, and based on the result // we calculate the actual amount and fee to use in real transaction. let dummy_amount = Coin { @@ -242,7 +242,7 @@ impl Client { SignerInfo::single_direct(Some(sender_public_key), account.sequence); let auth_info = signer_info.auth_info(fee); let sign_doc = - SignDoc::new(&tx_body, &auth_info, &self.chain_id, account.account_number) + SignDoc::new(&tx_body, &auth_info, &self.ss_chain_id, account.account_number) .map_err(|err| anyhow!("{err:?}"))?; let sign_doc_bytes = sign_doc diff --git a/crates/services/shared-sequencer/src/service.rs b/crates/services/shared-sequencer/src/service.rs index df57e00db7..6747781e64 100644 --- a/crates/services/shared-sequencer/src/service.rs +++ b/crates/services/shared-sequencer/src/service.rs @@ -46,6 +46,7 @@ pub struct Task { account_metadata: Option, prev_order: Option, blobs: Arc>, + interval: tokio::time::Interval, } impl NonInitializedTask { @@ -126,6 +127,7 @@ where } Ok(Task { + interval: tokio::time::interval(self.config.block_posting_frequency), shared_sequencer_client, config: self.config, signer: self.signer, @@ -140,48 +142,30 @@ impl Task where S: Signer, { - // This function is not cancel-safe because it calls `sleep` inside. - async fn blobs(&mut self) -> anyhow::Result> { + /// Fetch latest account metadata if it's not set + async fn ensure_account_metadata(&mut self) -> anyhow::Result<()> { + if self.account_metadata.is_some() { + return Ok(()); + } let ss = self .shared_sequencer_client .as_ref() - .expect("Shared sequencer client is not set; qed"); - - if self.account_metadata.is_none() { - // If the account is not funded, this code will fail - // because we can't sign the transaction without account metadata. - let account_metadata = ss.get_account_meta(self.signer.as_ref()).await; - - match account_metadata { - Ok(account_metadata) => { - self.account_metadata = Some(account_metadata); - } - Err(err) => { - // We don't want to spam the RPC endpoint with a lot of queries, - // so wait for one second before sending the next one. - tokio::time::sleep(Duration::from_secs(1)).await; - return Err(err); - } - } - } + .expect("Shared sequencer client is not set"); + self.account_metadata = Some(ss.get_account_meta(self.signer.as_ref()).await?); + Ok(()) + } - if self.prev_order.is_none() { - self.prev_order = ss.get_topic().await?.map(|f| f.order); - } - - tokio::time::sleep(self.config.block_posting_frequency).await; - - let blobs = { - let mut lock = self.blobs.lock().await; - core::mem::take(&mut *lock) - }; - - if blobs.is_empty() { - tokio::time::sleep(Duration::from_secs(1)).await; - Ok(None) - } else { - Ok(Some(blobs)) + /// Fetch previous order in the topic if it's not set + async fn ensure_prev_order(&mut self) -> anyhow::Result<()> { + if self.prev_order.is_some() { + return Ok(()); } + let ss = self + .shared_sequencer_client + .as_ref() + .expect("Shared sequencer client is not set"); + self.prev_order = ss.get_topic().await?.map(|f| f.order); + Ok(()) } } @@ -196,49 +180,46 @@ where return TaskNextAction::Stop; } + if let Err(err) = self.ensure_account_metadata().await { + // We don't want to spam the RPC endpoint with a lot of queries, + // so wait for one second before sending the next one. + tokio::time::sleep(Duration::from_secs(1)).await; + return TaskNextAction::ErrorContinue(err) + } + if let Err(err) = self.ensure_prev_order().await { + return TaskNextAction::ErrorContinue(err) + }; + tokio::select! { biased; _ = watcher.while_started() => { TaskNextAction::Stop }, - - blobs = self.blobs() => { - let blobs = match blobs { - Ok(blobs) => blobs, - Err(err) => return TaskNextAction::ErrorContinue(err), + _ = self.interval.tick() => { + let blobs = { + let mut lock = self.blobs.lock().await; + core::mem::take(&mut *lock) + }; + if blobs.is_empty() { + tokio::time::sleep(Duration::from_secs(1)).await; + return TaskNextAction::Continue; }; - // The `blobs` function is not cancel safe, as it calls sleep inside. - // If someone adds new logic into the `tokio::select`, please - // rework the `blobs` function to be cancel safe and use interval inside. - if let Some(blobs) = blobs { - let mut account = self.account_metadata.take().expect("Account metadata is not set; qed"); - let next_order = if let Some(prev_order) = self.prev_order { - prev_order.wrapping_add(1) - } else { - 0 - }; - - let ss = self.shared_sequencer_client - .as_ref().expect("Shared sequencer client is not set; qed"); - let blobs_bytes = postcard::to_allocvec(&blobs).expect("Failed to serialize SSBlob"); - let result = ss.send(self.signer.as_ref(), account, next_order, blobs_bytes).await; + let mut account = self.account_metadata.take().expect("Account metadata is not set"); + let next_order = self.prev_order.map(|prev| prev.wrapping_add(1)).unwrap_or(0); + let ss = self.shared_sequencer_client + .as_ref().expect("Shared sequencer client is not set"); + let blobs_bytes = postcard::to_allocvec(&blobs).expect("Failed to serialize SSBlob"); - match result { - Ok(_) => { - tracing::info!("Posted block to shared sequencer {blobs:?}"); - account.sequence = account.sequence.saturating_add(1); - self.prev_order = Some(next_order); - self.account_metadata = Some(account); - TaskNextAction::Continue - } - Err(err) => { - TaskNextAction::ErrorContinue(err) - } - } - } else { - TaskNextAction::Continue + if let Err(err) = ss.send(self.signer.as_ref(), account, next_order, blobs_bytes).await { + TaskNextAction::ErrorContinue(err); } + + tracing::info!("Posted block to shared sequencer {blobs:?}"); + account.sequence = account.sequence.saturating_add(1); + self.prev_order = Some(next_order); + self.account_metadata = Some(account); + TaskNextAction::Continue }, } }