Skip to content

Commit

Permalink
1.Fix order failure bug 2.Feat:no ordering when bulk mode
Browse files Browse the repository at this point in the history
  • Loading branch information
sulijia committed Mar 21, 2024
1 parent 89bd7a3 commit 6a7edc8
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 57 deletions.
16 changes: 8 additions & 8 deletions client/consensus/aura/src/collator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ where
inherent_data: (ParachainInherentData, InherentData),
proposal_duration: Duration,
max_pov_size: usize,
) -> Result<
Option<(Collation, ParachainBlockData<Block>, Block::Hash)>,
Box<dyn Error + Send + 'static>,
>{
) -> Result<
Option<(Collation, ParachainBlockData<Block>, Block::Hash)>,
Box<dyn Error + Send + 'static>,
> {
let mut digest = additional_pre_digest.into().unwrap_or_default();
digest.push(slot_claim.pre_digest.clone());

Expand All @@ -214,10 +214,10 @@ where
.await
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;

let proposal = match maybe_proposal {
None => return Ok(None),
Some(p) => p,
};
let proposal = match maybe_proposal {
None => return Ok(None),
Some(p) => p,
};

let sealed_importable = seal::<_, P>(
proposal.block,
Expand Down
11 changes: 7 additions & 4 deletions client/consensus/aura/src/collators/on_demand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// along with Magnet. If not, see <http://www.gnu.org/licenses/>.

use codec::{Codec, Decode};
use cumulus_client_collator::{relay_chain_driven::CollationRequest, service::ServiceInterface as CollatorServiceInterface};
use cumulus_client_collator::{
relay_chain_driven::CollationRequest, service::ServiceInterface as CollatorServiceInterface,
};
use cumulus_client_consensus_common::ParachainBlockImportMarker;
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_core::{
Expand All @@ -30,6 +32,7 @@ use polkadot_primitives::{CollatorPair, Id as ParaId};
use cumulus_primitives_core::PersistedValidationData;
use futures::lock::Mutex;
use futures::prelude::*;
use futures::{channel::mpsc::Receiver, prelude::*};
use magnet_primitives_order::OrderRecord;
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
use sc_consensus::BlockImport;
Expand All @@ -43,7 +46,6 @@ use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
use std::{convert::TryFrom, sync::Arc, time::Duration};
use futures::{channel::mpsc::Receiver, prelude::*};
/// Parameters for [`run`].
pub struct Params<BI, CIDP, Client, RClient, SO, Proposer, CS> {
/// Inherent data providers. Only non-consensus inherent data should be provided, i.e.
Expand Down Expand Up @@ -116,13 +118,14 @@ where
async move {
let mut collation_requests = match params.collation_request_receiver {
Some(receiver) => receiver,
None =>
None => {
cumulus_client_collator::relay_chain_driven::init(
params.collator_key,
params.para_id,
params.overseer_handle,
)
.await,
.await
},
};

let mut collator = {
Expand Down
24 changes: 12 additions & 12 deletions node/src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,17 @@ fn testnet_genesis(
],
// Genesis metadata: Vec<(id, name, symbol, decimals)>
"metadata": vec![
(1, "asset-1", "ALT1", 18),
(2, "asset-2", "ALT2", 18),
(1, b"asset-1".to_vec(), b"ALT1".to_vec(), 18),
(2, b"asset-2".to_vec(), b"ALT2".to_vec(), 18),
],
// Genesis accounts: Vec<(id, account_id, balance)>
"accounts": vec![
(1, alice, 500_000_000_0000_0000_0000u128),
(2, bob, 500_000_000_0000_0000_0000u128),
],
},
"assets_bridge": {
"admin_key": Some(root.clone()),
"assetsBridge": {
"adminKey": Some(root.clone()),
},
"council": {
"members": endowed_accounts
Expand All @@ -250,12 +250,12 @@ fn testnet_genesis(
.filter_map(|(idx, acc)| if idx % 2 == 0 { Some(acc.clone()) } else { None })
.collect::<Vec<_>>(),
},
"parachain_info": {
"parachain_id": id,
"parachainInfo": {
"parachainId": id,
},
"collator_selection": {
"collatorSelection": {
"invulnerables": invulnerables.iter().cloned().map(|(acc, _)| acc).collect::<Vec<_>>(),
"candidacy_bond": EXISTENTIAL_DEPOSIT * 16,
"candidacyBond": EXISTENTIAL_DEPOSIT * 16,
},
"session": {
"keys": invulnerables
Expand All @@ -271,14 +271,14 @@ fn testnet_genesis(
},
// no need to pass anything to aura, in fact it will panic if we do. Session will take care
// of this.
"polkadot_xcm": {
"safe_xcm_version": Some(SAFE_XCM_VERSION),
"polkadotXcm": {
"safeXcmVersion": Some(SAFE_XCM_VERSION),
},
"sudo": { "key": Some(root.clone()) },

// EVM compatibility
"evm_chain_id": {
"chain_id": u64::from(u32::from(id)),
"evmChainId": {
"chainId": u64::from(u32::from(id)),
},

"evm": { "accounts": evm_accounts },
Expand Down
5 changes: 5 additions & 0 deletions node/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38386,6 +38386,11 @@ pub mod api {
:: subxt :: ext :: scale_decode :: DecodeAsType,
:: subxt :: ext :: scale_encode :: EncodeAsType,
Debug,
Clone,
Eq,
PartialEq,
Ord,
PartialOrd,
)]
# [codec (crate = :: subxt :: ext :: codec)]
#[decode_as_type(crate_path = ":: subxt :: ext :: scale_decode")]
Expand Down
44 changes: 41 additions & 3 deletions node/src/on_demand_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
// You should have received a copy of the GNU General Public License
// along with Magnet. If not, see <http://www.gnu.org/licenses/>.

use crate::{submit_order::build_rpc_for_submit_order, submit_order::SubmitOrderError};
use crate::{
metadata::api::runtime_types::pallet_broker::coretime_interface::CoreAssignment,
submit_order::{build_rpc_for_submit_order, SubmitOrderError},
};
use codec::{Codec, Decode};

use crate::metadata::api::runtime_types::polkadot_runtime_parachains::assigner_coretime::CoreDescriptor;
use cumulus_primitives_core::{
relay_chain::BlockNumber as RelayBlockNumber, ParaId, PersistedValidationData,
};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use futures::{lock::Mutex, pin_mut, select, FutureExt, Stream, StreamExt};
use magnet_primitives_order::{
self,
well_known_keys::paras_para_lifecycles,
well_known_keys::{paras_core_descriptors, paras_para_lifecycles},
well_known_keys::{ACTIVE_CONFIG, ON_DEMAND_QUEUE, SPOT_TRAFFIC, SYSTEM_EVENTS},
OrderRecord, OrderRuntimeApi, OrderStatus,
};
Expand Down Expand Up @@ -95,14 +99,38 @@ where
async fn start_on_demand(
relay_chain: impl RelayChainInterface + Clone,
hash: H256,
para_id: ParaId,
) -> Option<bool> {
let active_config_storage = relay_chain.get_storage_by_key(hash, ACTIVE_CONFIG).await.ok()?;
let p_active_config = active_config_storage
.map(|raw| <HostConfiguration<u32>>::decode(&mut &raw[..]))
.transpose()
.ok()?;
if p_active_config.is_some() {
let result = p_active_config.unwrap().coretime_cores > 0;
let mut result = false;
let cores = p_active_config.unwrap().coretime_cores;
for core in 0..cores {
let key = paras_core_descriptors(polkadot_primitives::CoreIndex(core));
let core_descriptors_storage =
relay_chain.get_storage_by_key(hash, key.as_slice()).await.ok()?;
let p_core_descriptors = core_descriptors_storage
.map(|raw| <CoreDescriptor<u32>>::decode(&mut &raw[..]))
.transpose()
.ok()?;
if p_core_descriptors.is_some() {
let p_current_work = p_core_descriptors?.current_work;
if p_current_work.is_some() {
let current_work = p_current_work?;
for (assign, _) in current_work.assignments {
if assign == CoreAssignment::Task(para_id.into()) {
return Some(false);
} else if assign == CoreAssignment::Pool {
result = true
}
}
}
}
}
Some(result)
} else {
None
Expand All @@ -118,6 +146,7 @@ async fn try_place_order<Balance>(
slot_block: u32,
height: RelayBlockNumber,
relay_chain: impl RelayChainInterface + Clone,
number: u32,
) -> Result<(), SubmitOrderError>
where
Balance: Codec + MaybeDisplay + 'static + Debug + Into<u128>,
Expand All @@ -133,6 +162,7 @@ where
slot_block,
height,
relay_chain,
number,
)
.await
}
Expand Down Expand Up @@ -288,6 +318,13 @@ where
return Ok(());
} else {
//parathread
let p_on_demand = start_on_demand(relay_chain.clone(), p_hash, para_id).await;
if p_on_demand.is_some() {
let on_demand = p_on_demand.unwrap();
if !on_demand {
return Ok(());
}
}
let order_record_local = order_record.lock().await;
if order_record_local.relay_height == height {
return Ok(());
Expand Down Expand Up @@ -420,6 +457,7 @@ where
slot_block,
height,
relay_chain.clone(),
order_record_local.relay_base_height,
)
.await;
log::info!("===========place order completed==============",);
Expand Down
33 changes: 4 additions & 29 deletions node/src/submit_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub async fn build_rpc_for_submit_order(
slot_block: u32,
height: RelayBlockNumber,
relay_chain: impl RelayChainInterface + Clone,
number: u32,
) -> Result<(), SubmitOrderError> {
let client = OnlineClient::<PolkadotConfig>::from_url(url)
.await
Expand All @@ -122,36 +123,10 @@ pub async fn build_rpc_for_submit_order(

let signer_keystore = SignerKeystore::<PolkadotConfig>::new(keystore.clone());

// not init
let mut relay_hash = hash;
let mut for_n_blocks = slot_block;
if hash == H256::from([0; 32]) {
let chunk = u32::MAX - (slot_block - 1);
let r_relay_head = relay_chain
.header(BlockId::Number(height))
.await
.map_err(|_e| SubmitOrderError::GetHeadError)?;
if let Some(relay_head) = r_relay_head {
relay_hash = relay_head.hash();
let nex_relay_height = height + slot_block;
let nex_relay_height_align = nex_relay_height & chunk;
let height_chunk = nex_relay_height_align - height;
for_n_blocks = height_chunk;
} else {
return Err(SubmitOrderError::GetHeadError);
}
}
let latest_block = client
.blocks()
.at(relay_hash)
.await
.map_err(|_e| SubmitOrderError::GetBlockError)?;

let tx_params = Params::new().mortal(latest_block.header(), for_n_blocks.into()).build();
// let tx_params = Params::new().mortal_unchecked(number.into(), hash , slot_block.into()).build();

let submit_result =
client.tx().sign_and_submit(&place_order, &signer_keystore, tx_params).await;
log::info!("submit_result:{:?},{:?},{:?}", submit_result, height, relay_hash);
let submit_result = client.tx().sign_and_submit_default(&place_order, &signer_keystore).await;
log::info!("submit_result:{:?},{:?},{:?}", submit_result, height, hash);
submit_result.map_err(|_e| SubmitOrderError::RPCCallException)?;

Ok(())
Expand Down
17 changes: 16 additions & 1 deletion primitives/order/src/well_known_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

#![cfg_attr(not(feature = "std"), no_std)]

use {cumulus_primitives_core::ParaId, sp_core::Encode, sp_io::hashing::twox_64, sp_std::vec::Vec};
use cumulus_primitives_core::relay_chain::CoreIndex;
use {
cumulus_primitives_core::ParaId,
sp_core::Encode,
sp_io::hashing::{twox_256, twox_64},
sp_std::vec::Vec,
};

pub const PARAS_PARA_LIFECYCLES: &[u8] =
&hex_literal::hex!["cd710b30bd2eab0352ddcc26417aa194281e0bfde17b36573208a06cb5cfba6b"];
Expand Down Expand Up @@ -53,3 +59,12 @@ pub const SPOT_TRAFFIC: &[u8] =
//Configuration ActiveConfig
pub const ACTIVE_CONFIG: &[u8] =
&hex_literal::hex!["06de3d8a54d27e44a9d5ce189618f22db4b49d95320d9021994c850f25b8e385"];

pub const CORE_DESCRIPTORS: &[u8] =
&hex_literal::hex!["638595eebaa445ce03a13547bece90e704e6ac775a3245623103ffec2cb2c92f"];

pub fn paras_core_descriptors(core_index: CoreIndex) -> Vec<u8> {
core_index.using_encoded(|core_index: &[u8]| {
CORE_DESCRIPTORS.iter().chain(twox_256(core_index).iter()).cloned().collect()
})
}

0 comments on commit 6a7edc8

Please sign in to comment.