diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 679dda7ad2..918a9ae679 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -55,7 +55,6 @@ jobs: !github.event.pull_request.head.repo.fork uses: fluencelabs/decider/.github/workflows/tests.yml@main with: - ref: "feature/brnd-19" test-cargo-dependencies: | [ { diff --git a/Cargo.lock b/Cargo.lock index e1f37c3680..efc6f9bd1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1359,6 +1359,7 @@ dependencies = [ "hex-utils", "jsonrpsee 0.22.5", "libipld", + "libp2p-identity", "log-utils", "mockito", "particle-args", @@ -2127,9 +2128,9 @@ dependencies = [ [[package]] name = "decider-distro" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bafeb2d55c8843ac62a86ae40b229a834bc6a9b89da46e7ffbd551cec43bd159" +checksum = "1f87fc5b72d3931ea4017ee61d7c99a7f98ec6f337b877f4e78fe14a708cb1ec" dependencies = [ "built 0.7.1", "fluence-spell-dtos", @@ -5923,10 +5924,14 @@ dependencies = [ name = "nox-tests" version = "0.1.0" dependencies = [ + "alloy-primitives", + "alloy-sol-types", "aquamarine", "base64 0.21.7", "blake3", "bs58", + "chain-connector", + "chain-data", "clarity", "connected-client", "connection-pool", @@ -5941,6 +5946,7 @@ dependencies = [ "fstrings", "futures", "hex", + "hex-utils", "humantime-serde", "itertools 0.13.0", "json-utils", @@ -5968,7 +5974,6 @@ dependencies = [ "service-modules", "sorcerer", "spell-event-bus", - "subnet-resolver", "system-services", "tempfile", "test-constants", @@ -6370,7 +6375,6 @@ dependencies = [ "serde", "serde_json", "service-modules", - "subnet-resolver", "tempfile", "thiserror", "tokio", @@ -8342,23 +8346,6 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" -[[package]] -name = "subnet-resolver" -version = "0.1.0" -dependencies = [ - "chain-data", - "ethabi", - "eyre", - "hex", - "hex-utils", - "jsonrpsee 0.22.5", - "libp2p-identity", - "serde", - "serde_json", - "thiserror", - "tokio", -] - [[package]] name = "subtle" version = "2.5.0" @@ -8573,6 +8560,7 @@ name = "test-utils" version = "0.3.0" dependencies = [ "base64 0.21.7", + "clarity", "connected-client", "cpu-utils", "eyre", @@ -8581,6 +8569,7 @@ dependencies = [ "maplit", "particle-args", "serde_json", + "server-config", "service-modules", "test-constants", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 9c72fb4e31..a7d07a8b12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,6 @@ members = [ "crates/health", "sorcerer", "crates/nox-tests", - "crates/subnet-resolver", "nox", "aquamarine", "particle-protocol", @@ -96,7 +95,6 @@ spell-storage = { path = "spell-storage" } particle-execution = { path = "particle-execution" } system-services = { path = "crates/system-services" } health = { path = "crates/health" } -subnet-resolver = { path = "crates/subnet-resolver" } hex-utils = { path = "crates/hex-utils" } chain-data = { path = "crates/chain-data" } chain-listener = { path = "crates/chain-listener" } diff --git a/crates/chain-connector/Cargo.toml b/crates/chain-connector/Cargo.toml index 983f8f69f4..4cfed47b52 100644 --- a/crates/chain-connector/Cargo.toml +++ b/crates/chain-connector/Cargo.toml @@ -10,7 +10,7 @@ particle-builtins = { workspace = true } particle-execution = { workspace = true } particle-args = { workspace = true } chain-data = { workspace = true } -jsonrpsee = { workspace = true, features = ["macros", "ws-client"] } +jsonrpsee = { workspace = true, features = ["macros", "ws-client", "http-client"] } eyre = { workspace = true } fluence-libp2p = { workspace = true } serde_json = { workspace = true } @@ -29,6 +29,7 @@ const-hex = { workspace = true } serde = { workspace = true } async-trait = { workspace = true } libipld = { workspace = true } +libp2p-identity = { workspace = true } [dev-dependencies] mockito = { workspace = true } diff --git a/crates/chain-connector/src/builtins.rs b/crates/chain-connector/src/builtins.rs new file mode 100644 index 0000000000..5fc34139ff --- /dev/null +++ b/crates/chain-connector/src/builtins.rs @@ -0,0 +1,197 @@ +/* + * Nox Fluence Peer + * + * Copyright (C) 2024 Fluence DAO + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation version 3 of the + * License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +use crate::types::{SubnetResolveResult, TxReceiptResult, TxStatus, Worker}; +use crate::{ChainConnector, HttpChainConnector}; +use ccp_shared::types::CUID; +use futures::FutureExt; +use particle_args::{Args, JError}; +use particle_builtins::{wrap, CustomService}; +use particle_execution::{ParticleParams, ServiceFunction}; +use serde_json::json; +use serde_json::Value as JValue; +use std::collections::HashMap; +use std::sync::Arc; +use types::peer_scope::WorkerId; +use types::DealId; + +// macro to generate a closure for a builtin function +macro_rules! make_builtin_closure { + ($connector:expr, $function:ident) => {{ + let connector = $connector.clone(); + ServiceFunction::Immut(Box::new(move |args, params| { + let connector = connector.clone(); + async move { wrap($function(connector, args, params).await) }.boxed() + })) + }}; +} + +pub(crate) fn make_connector_builtins( + connector: Arc, +) -> HashMap { + let mut builtins = HashMap::new(); + builtins.insert( + "connector".to_string(), + CustomService::new( + vec![ + ( + "get_deals", + make_builtin_closure!(connector, get_deals_builtin), + ), + ( + "register_worker", + make_builtin_closure!(connector, register_worker_builtin), + ), + ( + "get_tx_receipts", + make_builtin_closure!(connector, get_tx_receipts_builtin), + ), + ], + None, + ), + ); + // Legacy service name; Can be deprecated and moved to connector in the future + builtins.insert( + "subnet".to_string(), + CustomService::new( + vec![( + "resolve", + make_builtin_closure!(connector, resolve_subnet_builtin), + )], + None, + ), + ); + builtins +} + +async fn get_deals_builtin( + connector: Arc, + _args: Args, + params: ParticleParams, +) -> Result { + if params.init_peer_id != connector.host_id { + return Err(JError::new( + "Only the root worker can call connector.get_deals", + )); + } + + let deals = connector + .get_deals() + .await + .map_err(|err| JError::new(format!("Failed to get deals: {err}")))?; + Ok(json!(deals)) +} + +async fn register_worker_builtin( + connector: Arc, + args: Args, + params: ParticleParams, +) -> Result { + if params.init_peer_id != connector.host_id { + return Err(JError::new( + "Only the root worker can call connector.register_worker", + )); + } + + let mut args = args.function_args.into_iter(); + let deal_id: DealId = Args::next("deal_id", &mut args)?; + let worker_id: WorkerId = Args::next("worker_id", &mut args)?; + let cu_ids: Vec = Args::next("cu_id", &mut args)?; + + if cu_ids.len() != 1 { + return Err(JError::new("Only one cu_id is allowed")); + } + + let tx_hash = connector + .register_worker(&deal_id, worker_id, cu_ids[0]) + .await + .map_err(|err| JError::new(format!("Failed to register worker: {err}")))?; + Ok(json!(tx_hash)) +} + +async fn get_tx_receipts_builtin( + connector: Arc, + args: Args, + params: ParticleParams, +) -> Result { + if params.init_peer_id != connector.host_id { + return Err(JError::new( + "Only the root worker can call connector.get_tx_receipt", + )); + } + + let mut args = args.function_args.into_iter(); + + let tx_hashes: Vec = Args::next("tx_hashes", &mut args)?; + + let receipts = connector + .get_tx_receipts(tx_hashes) + .await + .map_err(|err| JError::new(format!("Failed to get tx receipts: {err}")))? + .into_iter() + .map(|tx_receipt| match tx_receipt { + Ok(receipt) => match receipt { + TxStatus::Pending => TxReceiptResult::pending(), + TxStatus::Processed(receipt) => TxReceiptResult::processed(receipt), + }, + Err(err) => TxReceiptResult::error(err.to_string()), + }) + .collect::>(); + + Ok(json!(receipts)) +} + +async fn resolve_subnet_builtin( + connector: Arc, + args: Args, + _params: ParticleParams, +) -> Result { + let deal_id: String = Args::next("deal_id", &mut args.function_args.into_iter())?; + let deal_id = DealId::from(deal_id); + + let workers: eyre::Result> = try { + if !deal_id.is_valid() { + Err(eyre::eyre!( + "Invalid deal id '{}': invalid length", + deal_id.as_str() + ))?; + } + + let units = connector.get_deal_compute_units(&deal_id).await?; + let workers: Result, _> = units + .into_iter() + .map(|unit| Worker::try_from(unit)) + .collect(); + workers? + }; + + let result = match workers { + Ok(workers) => SubnetResolveResult { + success: true, + workers, + error: vec![], + }, + Err(err) => SubnetResolveResult { + success: false, + workers: vec![], + error: vec![format!("{}", err)], + }, + }; + + Ok(json!(result)) +} diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index ec97f8f17e..ae7b5d8467 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -27,34 +27,30 @@ use std::sync::Arc; use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash, CUID}; use clarity::{Transaction, Uint256}; use eyre::eyre; -use futures::FutureExt; use jsonrpsee::core::async_trait; use jsonrpsee::core::client::{BatchResponse, ClientT}; use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder}; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::rpc_params; -use serde_json::Value as JValue; use serde_json::{json, Value}; use tokio::sync::Mutex; +use crate::builtins::make_connector_builtins; +use crate::error::process_response; +use crate::eth_call::EthCall; +use crate::types::*; use crate::ConnectorError::{FieldNotFound, InvalidU256, ResponseParseError}; use crate::Deal::CIDV1; +use crate::Offer::{ComputePeer, ComputeUnit}; use crate::{CCStatus, Capacity, CommitmentId, Core, Deal, Offer}; use chain_data::{peer_id_to_bytes, BlockHeader}; use fluence_libp2p::PeerId; use hex_utils::{decode_hex, encode_hex_0x}; -use particle_args::{Args, JError}; -use particle_builtins::{wrap, CustomService}; -use particle_execution::{ParticleParams, ServiceFunction}; +use particle_builtins::CustomService; use server_config::ChainConfig; use types::peer_scope::WorkerId; use types::DealId; -use crate::error::process_response; -use crate::eth_call::EthCall; -use crate::types::*; -use crate::Offer::{ComputePeer, ComputeUnit}; - #[async_trait] pub trait ChainConnector: Send + Sync { async fn get_current_commitment_id(&self) -> Result>; @@ -87,18 +83,7 @@ pub struct HttpChainConnector { client: Arc, config: ChainConfig, tx_nonce_mutex: Arc>>, - host_id: PeerId, -} - -pub struct CCInitParams { - pub difficulty: Difficulty, - pub init_timestamp: U256, - pub current_timestamp: U256, - pub global_nonce: GlobalNonce, - pub current_epoch: U256, - pub epoch_duration: U256, - pub min_proofs_per_epoch: U256, - pub max_proofs_per_epoch: U256, + pub(crate) host_id: PeerId, } impl HttpChainConnector { @@ -115,151 +100,10 @@ impl HttpChainConnector { host_id, }); - let builtins = Self::make_connector_builtins(connector.clone()); + let builtins = make_connector_builtins(connector.clone()); Ok((connector, builtins)) } - fn make_connector_builtins(connector: Arc) -> HashMap { - let mut builtins = HashMap::new(); - builtins.insert( - "connector".to_string(), - CustomService::new( - vec![ - ("send_tx", Self::make_send_tx_closure(connector.clone())), - ("get_deals", Self::make_get_deals_closure(connector.clone())), - ( - "register_worker", - Self::make_register_worker_closure(connector.clone()), - ), - ( - "get_tx_receipts", - Self::make_get_tx_receipts_closure(connector.clone()), - ), - ], - None, - ), - ); - builtins - } - - fn make_send_tx_closure(connector: Arc) -> ServiceFunction { - ServiceFunction::Immut(Box::new(move |args, params| { - let connector = connector.clone(); - async move { wrap(connector.send_tx_builtin(args, params).await) }.boxed() - })) - } - - fn make_get_deals_closure(connector: Arc) -> ServiceFunction { - ServiceFunction::Immut(Box::new(move |_, params| { - let connector = connector.clone(); - async move { wrap(connector.get_deals_builtin(params).await) }.boxed() - })) - } - - fn make_register_worker_closure(connector: Arc) -> ServiceFunction { - ServiceFunction::Immut(Box::new(move |args, params| { - let connector = connector.clone(); - async move { wrap(connector.register_worker_builtin(args, params).await) }.boxed() - })) - } - - fn make_get_tx_receipts_closure(connector: Arc) -> ServiceFunction { - ServiceFunction::Immut(Box::new(move |args, params| { - let connector = connector.clone(); - async move { wrap(connector.get_tx_receipts_builtin(args, params).await) }.boxed() - })) - } - - // TODO: do we still need this builtin? - async fn send_tx_builtin( - &self, - args: Args, - params: ParticleParams, - ) -> std::result::Result { - if params.init_peer_id != self.host_id { - return Err(JError::new("Only the root worker can send transactions")); - } - - let mut args = args.function_args.into_iter(); - let data: Vec = Args::next("data", &mut args)?; - let to: String = Args::next("to", &mut args)?; - let tx_hash = self - .send_tx(data, &to) - .await - .map_err(|err| JError::new(format!("Failed to send tx: {err}")))?; - Ok(json!(tx_hash)) - } - - async fn get_deals_builtin( - &self, - params: ParticleParams, - ) -> std::result::Result { - if params.init_peer_id != self.host_id { - return Err(JError::new("Only the root worker can call connector")); - } - - let deals = self - .get_deals() - .await - .map_err(|err| JError::new(format!("Failed to get deals: {err}")))?; - Ok(json!(deals)) - } - - async fn register_worker_builtin( - &self, - args: Args, - params: ParticleParams, - ) -> std::result::Result { - if params.init_peer_id != self.host_id { - return Err(JError::new("Only the root worker can call connector")); - } - - let mut args = args.function_args.into_iter(); - let deal_id: DealId = Args::next("deal_id", &mut args)?; - let worker_id: WorkerId = Args::next("worker_id", &mut args)?; - let cu_ids: Vec = Args::next("cu_id", &mut args)?; - - if cu_ids.len() != 1 { - return Err(JError::new("Only one cu_id is allowed")); - } - - let tx_hash = self - .register_worker(&deal_id, worker_id, cu_ids[0]) - .await - .map_err(|err| JError::new(format!("Failed to register worker: {err}")))?; - Ok(json!(tx_hash)) - } - - async fn get_tx_receipts_builtin( - &self, - args: Args, - params: ParticleParams, - ) -> std::result::Result { - if params.init_peer_id != self.host_id { - return Err(JError::new("Only the root worker can call connector")); - } - - let mut args = args.function_args.into_iter(); - - let tx_hashes: Vec = Args::next("tx_hashes", &mut args)?; - - let receipts = self - .get_tx_receipts(tx_hashes) - .await - .map_err(|err| JError::new(format!("Failed to get tx receipts: {err}")))? - .into_iter() - .map(|tx_receipt| match tx_receipt { - Ok(receipt) => match receipt { - TxStatus::Pending => TxReceiptResult::pending(), - TxStatus::Processed(receipt) => TxReceiptResult::processed(receipt), - }, - Err(err) => TxReceiptResult::error(err.to_string()), - }) - .collect::>(); - - Ok(json!(receipts)) - } - async fn get_base_fee_per_gas(&self) -> Result { if let Some(fee) = self.config.default_base_fee { return Ok(Uint::from(fee)); @@ -518,53 +362,55 @@ impl HttpChainConnector { fn difficulty_params(&self) -> ArrayParams { let data = Core::difficultyCall {}.abi_encode(); - rpc_params![ - EthCall::to(&data, &self.config.core_contract_address), - "latest" - ] + self.make_latest_diamond_rpc_params(data) + } + + pub async fn get_deal_compute_units(&self, deal_id: &DealId) -> Result> { + let data = Deal::getComputeUnitsCall {}.abi_encode(); + let resp: String = process_response( + self.client + .request( + "eth_call", + rpc_params![EthCall::to(&data, &deal_id.to_address()), "latest"], + ) + .await, + )?; + let bytes = decode_hex(&resp)?; + let compute_units = as SolType>::abi_decode(&bytes, true)?; + + Ok(compute_units) } fn init_timestamp_params(&self) -> ArrayParams { let data = Core::initTimestampCall {}.abi_encode(); - rpc_params![ - EthCall::to(&data, &self.config.core_contract_address), - "latest" - ] + self.make_latest_diamond_rpc_params(data) } fn global_nonce_params(&self) -> ArrayParams { let data = Capacity::getGlobalNonceCall {}.abi_encode(); - rpc_params![ - EthCall::to(&data, &self.config.cc_contract_address), - "latest" - ] + self.make_latest_diamond_rpc_params(data) } fn current_epoch_params(&self) -> ArrayParams { let data = Core::currentEpochCall {}.abi_encode(); - rpc_params![ - EthCall::to(&data, &self.config.core_contract_address), - "latest" - ] + self.make_latest_diamond_rpc_params(data) } fn epoch_duration_params(&self) -> ArrayParams { let data = Core::epochDurationCall {}.abi_encode(); - rpc_params![ - EthCall::to(&data, &self.config.core_contract_address), - "latest" - ] + self.make_latest_diamond_rpc_params(data) } fn min_proofs_per_epoch_params(&self) -> ArrayParams { let data = Core::minProofsPerEpochCall {}.abi_encode(); - rpc_params![ - EthCall::to(&data, &self.config.core_contract_address), - "latest" - ] + self.make_latest_diamond_rpc_params(data) } fn max_proofs_per_epoch_params(&self) -> ArrayParams { let data = Core::maxProofsPerEpochCall {}.abi_encode(); + self.make_latest_diamond_rpc_params(data) + } + + fn make_latest_diamond_rpc_params(&self, data: Vec) -> ArrayParams { rpc_params![ - EthCall::to(&data, &self.config.core_contract_address), + EthCall::to(&data, &self.config.diamond_contract_address), "latest" ] } @@ -580,13 +426,7 @@ impl ChainConnector for HttpChainConnector { .abi_encode(); let resp: String = process_response( self.client - .request( - "eth_call", - rpc_params![ - EthCall::to(&data, &self.config.market_contract_address), - "latest" - ], - ) + .request("eth_call", self.make_latest_diamond_rpc_params(data)) .await, )?; let compute_peer = ::abi_decode(&decode_hex(&resp)?, true)?; @@ -656,13 +496,7 @@ impl ChainConnector for HttpChainConnector { let resp: String = process_response( self.client - .request( - "eth_call", - rpc_params![ - EthCall::to(&data, &self.config.market_contract_address), - "latest" - ], - ) + .request("eth_call", self.make_latest_diamond_rpc_params(data)) .await, )?; let bytes = decode_hex(&resp)?; @@ -679,13 +513,7 @@ impl ChainConnector for HttpChainConnector { let resp: String = process_response( self.client - .request( - "eth_call", - rpc_params![ - EthCall::to(&data, &self.config.cc_contract_address), - "latest" - ], - ) + .request("eth_call", self.make_latest_diamond_rpc_params(data)) .await, )?; Ok(::abi_decode( @@ -725,7 +553,8 @@ impl ChainConnector for HttpChainConnector { } .abi_encode(); - self.send_tx(data, &self.config.cc_contract_address).await + self.send_tx(data, &self.config.diamond_contract_address) + .await } async fn get_deal_statuses(&self, deal_ids: Vec) -> Result>> { @@ -757,7 +586,7 @@ impl ChainConnector for HttpChainConnector { } .abi_encode(); - self.send_tx(data, &self.config.market_contract_address) + self.send_tx(data, &self.config.diamond_contract_address) .await } @@ -838,9 +667,7 @@ mod tests { let (connector, _) = HttpChainConnector::new( server_config::ChainConfig { http_endpoint: url.to_string(), - cc_contract_address: "0x0E62f5cfA5189CA34E79CCB03829C064405790aD".to_string(), - core_contract_address: "0x2f5224b7Cb8bd98d9Ef61c247F4741758E8E873d".to_string(), - market_contract_address: "0x1dC1eB8fc8dBc35be6fE75ceba05C7D410a2e721".to_string(), + diamond_contract_address: "0x2f5224b7Cb8bd98d9Ef61c247F4741758E8E873d".to_string(), network_id: 3525067388221321, wallet_key: PrivateKey::from_str( "0x97a2456e78c4894c62eef6031972d1ca296ed40bf311ab54c231f13db59fc428", diff --git a/crates/chain-connector/src/function/deal.rs b/crates/chain-connector/src/function/deal.rs index 6e5f04c896..234910d7c2 100644 --- a/crates/chain-connector/src/function/deal.rs +++ b/crates/chain-connector/src/function/deal.rs @@ -44,6 +44,14 @@ sol! { SMALL_BALANCE } + struct ComputeUnit { + bytes32 id; + bytes32 workerId; + bytes32 peerId; + address provider; + uint256 joinedEpoch; + } + /// @dev Returns the status of the deal function getStatus() external view returns (Status); @@ -52,6 +60,9 @@ sol! { /// @dev Set worker ID for a compute unit. Compute unit can have only one worker ID function setWorker(bytes32 computeUnitId, bytes32 workerId) external; + + /// @dev Returns the compute units info by provider + function getComputeUnits() public view returns (ComputeUnit[] memory); } } diff --git a/crates/chain-connector/src/lib.rs b/crates/chain-connector/src/lib.rs index c0947c6fa7..54b3a9953c 100644 --- a/crates/chain-connector/src/lib.rs +++ b/crates/chain-connector/src/lib.rs @@ -26,11 +26,13 @@ mod connector; mod error; mod function; +mod builtins; mod eth_call; mod types; -pub use connector::CCInitParams; +pub use self::types::CCInitParams; pub use connector::ChainConnector; pub use connector::HttpChainConnector; pub use error::ConnectorError; pub use function::*; +pub use types::SubnetResolveResult; diff --git a/crates/chain-connector/src/types.rs b/crates/chain-connector/src/types.rs index 9689b6a91c..fd973adf96 100644 --- a/crates/chain-connector/src/types.rs +++ b/crates/chain-connector/src/types.rs @@ -16,13 +16,15 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -use ccp_shared::types::CUID; -use serde::{Deserialize, Serialize}; - -use types::DealId; - use crate::error::ConnectorError; use crate::function::Deal; +use crate::Deal::ComputeUnit; +use alloy_primitives::U256; +use ccp_shared::types::{Difficulty, GlobalNonce, CUID}; +use chain_data::parse_peer_id; +use eyre::{eyre, Report}; +use serde::{Deserialize, Serialize}; +use types::DealId; pub type Result = std::result::Result; @@ -138,3 +140,50 @@ impl RawTxReceipt { } } } + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct Worker { + pub cu_ids: Vec, + pub host_id: String, + pub worker_id: Vec, +} + +impl TryFrom for Worker { + type Error = Report; + fn try_from(unit: ComputeUnit) -> eyre::Result { + let mut worker_id = vec![]; + if !unit.workerId.is_zero() { + let w_id = parse_peer_id(&unit.workerId.0) + .map_err(|err| eyre!("Failed to parse unit.workerId: {err}"))? + .to_base58(); + worker_id.push(w_id) + } + let cu_id = unit.id.to_string(); + let peer_id = parse_peer_id(&unit.peerId.0) + .map_err(|err| eyre!("Failed to parse unit.peerId: {err}"))?; + + Ok(Self { + cu_ids: vec![cu_id], + host_id: peer_id.to_base58(), + worker_id, + }) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct SubnetResolveResult { + pub success: bool, + pub workers: Vec, + pub error: Vec, +} + +pub struct CCInitParams { + pub difficulty: Difficulty, + pub init_timestamp: U256, + pub current_timestamp: U256, + pub global_nonce: GlobalNonce, + pub current_epoch: U256, + pub epoch_duration: U256, + pub min_proofs_per_epoch: U256, + pub max_proofs_per_epoch: U256, +} diff --git a/crates/chain-data/src/utils.rs b/crates/chain-data/src/utils.rs index ab39999af5..137648838d 100644 --- a/crates/chain-data/src/utils.rs +++ b/crates/chain-data/src/utils.rs @@ -23,8 +23,8 @@ use libp2p_identity::{ParseError, PeerId}; /// Static prefix of the PeerId. Protobuf encoding + multihash::identity + length and so on. pub(crate) const PEER_ID_PREFIX: &[u8] = &[0, 36, 8, 1, 18, 32]; -pub fn parse_peer_id(bytes: Vec) -> Result { - let peer_id = [PEER_ID_PREFIX, &bytes].concat(); +pub fn parse_peer_id(bytes: &[u8]) -> Result { + let peer_id = [PEER_ID_PREFIX, bytes].concat(); PeerId::from_bytes(&peer_id) } @@ -41,7 +41,7 @@ pub fn peer_id_to_hex(peer_id: PeerId) -> String { } pub fn peer_id_from_hex(hex: &str) -> eyre::Result { - Ok(parse_peer_id(decode_hex(hex)?)?) + Ok(parse_peer_id(&decode_hex(hex)?)?) } #[cfg(test)] @@ -55,7 +55,7 @@ mod tests { PeerId::from_str("12D3KooWCGZ6t8by5ag5YMQW4k3HoPLaKdN5rB9DhAmDUeG8dj1N").unwrap(); assert_eq!( peer_id, - parse_peer_id(decode_hex(&hex[2..]).unwrap()).unwrap() + parse_peer_id(&decode_hex(&hex[2..]).unwrap()).unwrap() ); assert_eq!(hex, peer_id_to_hex(peer_id)); } diff --git a/crates/chain-listener/src/event/cc_activated.rs b/crates/chain-listener/src/event/cc_activated.rs index 2320014669..5aedc08e28 100644 --- a/crates/chain-listener/src/event/cc_activated.rs +++ b/crates/chain-listener/src/event/cc_activated.rs @@ -69,7 +69,7 @@ mod test { assert!(result.is_ok(), "can't parse data: {:?}", result); let result = result.unwrap(); assert_eq!( - parse_peer_id(result.peerId.to_vec()).unwrap().to_string(), + parse_peer_id(result.peerId.as_slice()).unwrap().to_string(), "12D3KooWP7RkvkBhbe7ATd451zxTifzF6Gm1uzCDadqQueET7EMe" // it's also the second topic ); diff --git a/crates/chain-listener/src/event/compute_unit_matched.rs b/crates/chain-listener/src/event/compute_unit_matched.rs index b008e7f91c..14202abd53 100644 --- a/crates/chain-listener/src/event/compute_unit_matched.rs +++ b/crates/chain-listener/src/event/compute_unit_matched.rs @@ -56,7 +56,7 @@ mod tests { 88, 198, 255, 218, 126, 170, 188, 84, 84, 39, 255, 137, 18, 55, 7, 139, 121, 207, 149, 42, 196, 115, 102, 160, 4, 47, 227, 62, 7, 53, 189, 15, ]; - let peer_id = parse_peer_id(bytes.into()).expect("parse peer_id from Token"); + let peer_id = parse_peer_id(&bytes).expect("parse peer_id from Token"); assert_eq!( peer_id.to_string(), String::from("12D3KooWFnv3Qc25eKpTDCNBoW1jXHMHHHSzcJoPkHai1b2dHNra") @@ -64,7 +64,7 @@ mod tests { let hex = "0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7"; let bytes = decode_hex(hex).expect("parse peer_id from hex"); - let peer_id = parse_peer_id(bytes).expect("parse peer_id from Token"); + let peer_id = parse_peer_id(&bytes).expect("parse peer_id from Token"); assert_eq!( peer_id.to_string(), String::from("12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE") @@ -96,7 +96,7 @@ mod tests { let m = parse_log::(log1).expect("error parsing Match from log"); assert_eq!( - parse_peer_id(m.peerId.to_vec()).unwrap().to_string(), + parse_peer_id(m.peerId.as_slice()).unwrap().to_string(), "12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE" ); assert_eq!( @@ -120,7 +120,7 @@ mod tests { let m = parse_log::(log2).expect("error parsing Match from log"); assert_eq!( - parse_peer_id(m.peerId.to_vec()).unwrap().to_string(), + parse_peer_id(m.peerId.as_slice()).unwrap().to_string(), "12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE" ); assert_eq!( diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index f03063c08a..7b4f125c70 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -601,7 +601,7 @@ impl ChainListener { let topics = vec![topic, peer_id_to_hex(self.host_id)]; rpc_params![ "logs", - json!({"address": self.config.cc_contract_address, "topics": topics}) + json!({"address": self.config.diamond_contract_address, "topics": topics}) ] } @@ -609,7 +609,7 @@ impl ChainListener { let topic = UnitActivated::SIGNATURE_HASH.to_string(); rpc_params![ "logs", - json!({"address": self.config.cc_contract_address, "topics": vec![topic, encode_hex_0x(commitment_id.0)]}) + json!({"address": self.config.diamond_contract_address, "topics": vec![topic, encode_hex_0x(commitment_id.0)]}) ] } @@ -617,7 +617,7 @@ impl ChainListener { let topic = UnitDeactivated::SIGNATURE_HASH.to_string(); rpc_params![ "logs", - json!({"address": self.config.cc_contract_address, "topics": vec![topic, encode_hex_0x(commitment_id.0)]}) + json!({"address": self.config.diamond_contract_address, "topics": vec![topic, encode_hex_0x(commitment_id.0)]}) ] } @@ -628,7 +628,7 @@ impl ChainListener { ]; rpc_params![ "logs", - json!({"address": self.config.market_contract_address, "topics": topics}) + json!({"address": self.config.diamond_contract_address, "topics": topics}) ] } diff --git a/crates/created-swarm/src/swarm.rs b/crates/created-swarm/src/swarm.rs index bed2f22831..71dbf36bb1 100644 --- a/crates/created-swarm/src/swarm.rs +++ b/crates/created-swarm/src/swarm.rs @@ -313,7 +313,6 @@ pub struct SwarmConfig { pub extend_system_services: Vec, pub override_system_services_config: Option, pub http_port: u16, - pub connector_api_endpoint: Option, pub chain_config: Option, pub cc_events_dir: Option, pub network_key: NetworkKey, @@ -344,7 +343,6 @@ impl SwarmConfig { extend_system_services: vec![], override_system_services_config: None, http_port: 0, - connector_api_endpoint: None, chain_config: None, cc_events_dir: None, network_key, @@ -460,9 +458,6 @@ pub async fn create_swarm_with_runtime( }) .collect(); - if let Some(endpoint) = config.connector_api_endpoint.clone() { - resolved.system_services.decider.network_api_endpoint = endpoint; - } let management_peer_id = libp2p::identity::Keypair::from(config.management_keypair.clone()) .public() diff --git a/crates/nox-tests/Cargo.toml b/crates/nox-tests/Cargo.toml index ca3b0c5a58..8537e8f3c2 100644 --- a/crates/nox-tests/Cargo.toml +++ b/crates/nox-tests/Cargo.toml @@ -21,9 +21,11 @@ local-vm = { workspace = true } control-macro = { workspace = true } json-utils = { workspace = true } system-services = { workspace = true } -subnet-resolver = { workspace = true } fs-utils = { workspace = true } server-config = { workspace = true } +chain-connector = { workspace = true } +chain-data = { workspace = true } +hex-utils = { workspace = true } log-utils = { workspace = true } fluence-spell-dtos = { workspace = true } @@ -63,3 +65,5 @@ tempfile = { workspace = true } jsonrpsee = { workspace = true, features = ["server"] } hex = { workspace = true } clarity = { workspace = true } +alloy-sol-types = { workspace = true } +alloy-primitives = { workspace = true } diff --git a/crates/nox-tests/tests/builtin.rs b/crates/nox-tests/tests/builtin.rs index 32be3cef5e..ebe42f80c7 100644 --- a/crates/nox-tests/tests/builtin.rs +++ b/crates/nox-tests/tests/builtin.rs @@ -19,6 +19,11 @@ #[macro_use] extern crate fstrings; +use alloy_primitives::hex; +use alloy_sol_types::sol_data::Array; +use alloy_sol_types::SolType; +use chain_connector::SubnetResolveResult; +use chain_data::{parse_peer_id, peer_id_to_bytes}; use connected_client::ConnectedClient; use created_swarm::{ make_swarms, make_swarms_with_cfg, make_swarms_with_keypair, @@ -28,6 +33,7 @@ use eyre::{Report, WrapErr}; use fluence_keypair::KeyPair; use fluence_libp2p::RandomPeerId; use fluence_libp2p::Transport; +use hex_utils::encode_hex_0x; use itertools::Itertools; use json_utils::into_array; use libp2p::core::Multiaddr; @@ -42,11 +48,11 @@ use serde::Deserialize; use serde_json::{json, Value as JValue}; use service_modules::load_module; use std::collections::HashMap; +use std::default::Default; use std::str::FromStr; use std::time::Duration; -use subnet_resolver::SubnetResolveResult; use test_constants::PARTICLE_TTL; -use test_utils::create_service; +use test_utils::{create_service, get_default_chain_config}; #[derive(Deserialize, Debug)] struct NodeInfo { @@ -69,7 +75,7 @@ async fn identify() { (seq (call relay ("peer" "identify") [] info) (call client ("op" "return") [info]) - ) + ) "#, hashmap! { "relay" => json!(client.node.to_string()), @@ -1154,7 +1160,7 @@ async fn timeout_wait() { (seq (par (call relay ("peer" "timeout") [1000 "timed_out"] $ok_or_err) - (call "invalid_peer" ("op" "identity") ["never"] $ok_or_err) + (call "invalid_peer" ("op" "identity") ["never"] $ok_or_err) ) (canon %init_peer_id% $ok_or_err #ok_or_err) ) @@ -1996,7 +2002,7 @@ async fn json_builtins() { (seq (canon relay $single-pair #single-pair-1) (ap #single-pair-1 $pairs) - ) + ) ) ) (new $single-pair @@ -2053,7 +2059,7 @@ async fn json_builtins() { (ap #single-pair-4 $puts-pairs) ) ) - ) + ) (new $single-pair (seq (seq @@ -2371,17 +2377,24 @@ async fn aliases_restart() { #[ignore] #[tokio::test] async fn subnet_resolve() { - let expected_request = r#"{"jsonrpc":"2.0","id":0,"method":"eth_call","params":[{"data":"0x4b66a309","to":"0x9DcaFca9B88f49d91c38a32E7d9A86a7d9a37B04"},"latest"]}"#; - let expected_request: serde_json::Value = - serde_json::from_str(expected_request).expect("parse expected_request as json"); - - let jsonrpc = r#" - { - "jsonrpc": "2.0", - "id": 0, - "result": "0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000153aadfa1d6cd4c8a18f7eb26bd0b83ca10b664845cd72e2dd871f78b2006f5a7b5ecc6c89e9c2add9a9d3b08e7c8ed2155d980e48870b72cfb9c5c16a088ebfb0a510f7418603d18c14fd3e6dbc2bf4ce5b2e9ef3dac15428a9b31a7bf5a11a8000000000000000000000000627e730fd1361e6ffcee236dac08f82eaa8ac7cd0000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000000000000000000000000000000000000006c16216" - } - "#; + let cu_1 = chain_connector::Deal::ComputeUnit { + id: hex!("0000000000000000000000000000000000000000000000000000000000000001").into(), + workerId: peer_id_to_bytes(RandomPeerId::random()).into(), + peerId: peer_id_to_bytes(RandomPeerId::random()).into(), + provider: Default::default(), + joinedEpoch: Default::default(), + }; + + let cu_2 = chain_connector::Deal::ComputeUnit { + id: hex!("0000000000000000000000000000000000000000000000000000000000000002").into(), + workerId: Default::default(), + peerId: peer_id_to_bytes(RandomPeerId::random()).into(), + provider: Default::default(), + joinedEpoch: Default::default(), + }; + let resolve_result = encode_hex_0x(Array::::abi_encode( + &vec![cu_1.clone(), cu_2.clone()], + )); // Create a mock let mut server = mockito::Server::new_async().await; @@ -2392,30 +2405,29 @@ async fn subnet_resolve() { let body = req.body().expect("mock: get req body"); let body: serde_json::Value = serde_json::from_slice(body).expect("mock: parse req body as json"); + assert!(body.is_object()); assert_eq!( - body, expected_request, - "invalid request. expected {}, got {}", - expected_request, body + body.as_object().unwrap()["method"].as_str().unwrap(), + "eth_call" ); - jsonrpc.into() + + let id = body.as_object().unwrap()["id"].as_u64().unwrap(); + + json!({ + "jsonrpc": "2.0", + "id": id, + "result": resolve_result, + }) + .to_string() + .into() }) - // expect to receive this exact body in POST - // .match_body(r#"{"jsonrpc":"2.0","id":0,"method":"eth_getLogs","params":[{"fromBlock":"0x52","toBlock":"0x246","address":"0x6328bb918a01603adc91eae689b848a9ecaef26d","topics":["0x55e61a24ecdae954582245e5e611fb06905d6af967334fff4db72793bebc72a9","0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7"]}]}"#) - // expect exactly 1 POST request .expect(2) .with_status(200) .with_header("content-type", "application/json") .create(); - let invalid_mock = server - .mock("POST", "/") - .expect(0) - .with_status(404) - .with_body("invalid mock was hit. Check that request body matches 'match_body' clause'") - .create(); - let swarms = make_swarms_with_cfg(1, move |mut cfg| { - cfg.connector_api_endpoint = Some(url.clone()); + cfg.chain_config = Some(get_default_chain_config(&url)); cfg }) .await; @@ -2463,28 +2475,28 @@ async fn subnet_resolve() { assert!(subnet.success, "{:?}", subnet.error); assert_eq!(subnet.error.len(), 0); - let pats: Vec<_> = subnet + let workers: Vec<_> = subnet .workers .iter() - .map(|p| (p.pat_id.as_str(), p.host_id.as_str(), p.worker_id.clone())) + .map(|p| (p.cu_ids.clone(), p.host_id.clone(), p.worker_id.clone())) .collect(); assert_eq!( - pats, + workers, vec![ ( - "0x53aadfa1d6cd4c8a18f7eb26bd0b83ca10b664845cd72e2dd871f78b2006f5a7", - "12D3KooWN4XNKgu76nwB7iKUXmE4FKCA5Ycak6SbSqLTaWo2nFsQ", - vec!["12D3KooWAWdwEujemZN1LQ87bPKSeAvDA1yMGistnL4yF8awuUqV".to_string()], + vec![encode_hex_0x(cu_1.id.0).to_string()], + parse_peer_id(&cu_1.peerId.0).unwrap().to_base58(), + vec![parse_peer_id(&cu_1.workerId.0).unwrap().to_base58()], ), - // TODO: add more nodes in subnet later + ( + vec![encode_hex_0x(cu_2.id.0).to_string()], + parse_peer_id(&cu_2.peerId.0).unwrap().to_base58(), + vec![] + ) ] ); - // assert that there was no invalid requests - invalid_mock.assert(); - - // TODO: how to check request body? // check that mock was called mock.assert(); } diff --git a/crates/nox-tests/tests/workers.rs b/crates/nox-tests/tests/workers.rs index 86eada06d3..c42cff9129 100644 --- a/crates/nox-tests/tests/workers.rs +++ b/crates/nox-tests/tests/workers.rs @@ -16,14 +16,14 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - use connected_client::ConnectedClient; -use created_swarm::make_swarms; +use created_swarm::{make_swarms, make_swarms_with_cfg}; use eyre::Context; use hex::FromHex; use log_utils::enable_logs; use maplit::hashmap; use serde_json::{json, Value}; +use test_utils::get_default_chain_config; use workers::CUID; pub(crate) async fn create_worker(client: &mut ConnectedClient, deal_id: &str) -> String { @@ -153,7 +153,21 @@ async fn test_resolve_subnet_on_worker() { .wrap_err("decode test data") .unwrap(); - let swarms = make_swarms(1).await; + // Create a mock + let mut server = mockito::Server::new_async().await; + let url = server.url(); + let _mock = server + .mock("POST", "/") + .expect(1) + .with_status(429) + .with_header("content-type", "application/json") + .create(); + + let swarms = make_swarms_with_cfg(1, move |mut cfg| { + cfg.chain_config = Some(get_default_chain_config(&url)); + cfg + }) + .await; let mut client = ConnectedClient::connect_with_keypair( swarms[0].multiaddr.clone(), @@ -179,7 +193,7 @@ async fn test_resolve_subnet_on_worker() { let expected = { let error = Value::Array(vec![Value::String( - "error sending jsonrpc request: 'Request rejected `429`'".to_string(), + "RPC error: Request rejected `429`".to_string(), )]); let mut object_map = serde_json::Map::new(); object_map.insert("error".to_string(), error); diff --git a/crates/server-config/src/defaults.rs b/crates/server-config/src/defaults.rs index b6821517e6..943c766237 100644 --- a/crates/server-config/src/defaults.rs +++ b/crates/server-config/src/defaults.rs @@ -253,23 +253,6 @@ pub fn default_registry_replicate_spell_period_sec() -> u32 { 3600 } -pub fn default_decider_network_api_endpoint() -> String { - "https://endpoints.omniatech.io/v1/matic/mumbai/public".to_string() -} - -pub fn default_matcher_address() -> String { - // on mumbai - "0x93A2897deDcC5478a9581808F5EC25F4FadbC312".to_string() -} - -pub fn default_decider_start_block_hex() -> String { - "latest".to_string() -} - -pub fn default_decider_worker_gas() -> u64 { - 210_000 -} - pub fn default_ipfs_binary_path() -> PathBuf { "/usr/bin/ipfs".into() } @@ -278,11 +261,6 @@ pub fn default_curl_binary_path() -> PathBuf { "/usr/bin/curl".into() } -pub fn default_decider_network_id() -> u64 { - // 80001 = polygon mumbai - 80001 -} - pub fn default_effectors() -> HashMap)> { hashmap! { "curl".to_string() => ("bafkreids22lgia5bqs63uigw4mqwhsoxvtnkpfqxqy5uwyyerrldsr32ce".to_string(), hashmap! { diff --git a/crates/server-config/src/node_config.rs b/crates/server-config/src/node_config.rs index 948b9fdb90..bf09bcfe1d 100644 --- a/crates/server-config/src/node_config.rs +++ b/crates/server-config/src/node_config.rs @@ -48,7 +48,7 @@ use crate::avm_config::AVMConfig; use crate::kademlia_config::{KademliaConfig, UnresolvedKademliaConfig}; use crate::keys::{decode_key, decode_secret_key, load_key}; use crate::services_config::ServicesConfig; -use crate::system_services_config::{ServiceKey, SystemServicesConfig}; +use crate::system_services_config::SystemServicesConfig; use crate::BootstrapConfig; use super::defaults::*; @@ -213,9 +213,7 @@ impl TryFrom<&Network> for StreamProtocol { } impl UnresolvedNodeConfig { - pub fn resolve(mut self, persistent_base_dir: &Path) -> eyre::Result { - self.load_system_services_envs(); - + pub fn resolve(self, persistent_base_dir: &Path) -> eyre::Result { let bootstrap_nodes = match self.local { Some(true) => vec![], _ => self.bootstrap_nodes, @@ -288,99 +286,6 @@ impl UnresolvedNodeConfig { Ok(result) } - - // This is a temporary solution to save backward compatibility for some time - // Couldn't figure out how to use layered configs for this - // Print warning not to forget to fix it in the future - fn load_system_services_envs(&mut self) { - if let Ok(aqua_ipfs_external_addr) = - std::env::var("FLUENCE_ENV_AQUA_IPFS_EXTERNAL_API_MULTIADDR") - { - log::warn!( - "Override configuration of aqua-ipfs system service (external multiaddr) from ENV" - ); - self.system_services.aqua_ipfs.external_api_multiaddr = aqua_ipfs_external_addr; - } - - if let Ok(aqua_ipfs_local_addr) = std::env::var("FLUENCE_ENV_AQUA_IPFS_LOCAL_API_MULTIADDR") - { - log::warn!( - "Override configuration of aqua-ipfs system service (local multiaddr) from ENV" - ); - self.system_services.aqua_ipfs.local_api_multiaddr = aqua_ipfs_local_addr; - } - - if let Ok(enable_decider) = std::env::var("FLUENCE_ENV_CONNECTOR_JOIN_ALL_DEALS") { - match enable_decider.as_str() { - "true" => { - log::warn!( - "Override configuration of system services (enable decider) from ENV" - ); - self.system_services.enable.push(ServiceKey::Decider); - } - "false" => { - log::warn!( - "Override configuration of system services (disable decider) from ENV" - ); - self.system_services - .enable - .retain(|key| *key != ServiceKey::Decider); - } - _ => {} - } - } - if let Ok(decider_api_endpoint) = std::env::var("FLUENCE_ENV_CONNECTOR_API_ENDPOINT") { - log::warn!( - "Override configuration of decider system spell (api endpoint) from ENV to {}", - decider_api_endpoint - ); - self.system_services.decider.network_api_endpoint = decider_api_endpoint; - } - - if let Ok(decider_contract_addr) = std::env::var("FLUENCE_ENV_CONNECTOR_CONTRACT_ADDRESS") { - log::warn!( - "Override configuration of decider system spell (contract address) from ENV to {}", - decider_contract_addr - ); - self.system_services.decider.matcher_address = decider_contract_addr; - } - - if let Ok(decider_from_block) = std::env::var("FLUENCE_ENV_CONNECTOR_FROM_BLOCK") { - log::warn!( - "Override configuration of decider system spell (from block) from ENV to {}", - decider_from_block - ); - self.system_services.decider.start_block = decider_from_block; - } - - if let Ok(decider_wallet_key) = std::env::var("FLUENCE_ENV_CONNECTOR_WALLET_KEY") { - log::warn!("Override configuration of decider system spell (wallet key) from ENV"); - self.system_services.decider.wallet_key = Some(decider_wallet_key); - } - - if let Ok(worker_ipfs_multiaddr) = std::env::var("FLUENCE_ENV_DECIDER_IPFS_MULTIADDR") { - log::warn!( - "Override configuration of decider system spell (ipfs multiaddr) from ENV to {}", - worker_ipfs_multiaddr - ); - self.system_services.decider.worker_ipfs_multiaddr = worker_ipfs_multiaddr; - } - - if let Ok(worker_gas) = std::env::var("FLUENCE_ENV_CONNECTOR_WORKER_GAS") { - match worker_gas.parse() { - Ok(worker_gas) => { - log::warn!( - "Override configuration of decider system spell (worker gas) from ENV to {}", worker_gas - ); - self.system_services.decider.worker_gas = worker_gas; - } - Err(err) => log::warn!( - "Unable to override worker gas, value is not a valid u64: {}", - err - ), - } - } - } } #[derive(Clone, Derivative, Serialize)] @@ -634,10 +539,7 @@ impl KeypairConfig { #[derivative(Debug)] pub struct ChainConfig { pub http_endpoint: String, - // TODO get all addresses from Core contract - pub core_contract_address: String, - pub cc_contract_address: String, - pub market_contract_address: String, + pub diamond_contract_address: String, pub network_id: u64, pub wallet_key: PrivateKey, /// If none, comes from the chain diff --git a/crates/server-config/src/system_services_config.rs b/crates/server-config/src/system_services_config.rs index a1bfb7d26c..0fd1e30b16 100644 --- a/crates/server-config/src/system_services_config.rs +++ b/crates/server-config/src/system_services_config.rs @@ -131,18 +131,6 @@ pub struct DeciderConfig { pub worker_period_sec: u32, #[serde(default = "default_ipfs_multiaddr")] pub worker_ipfs_multiaddr: String, - #[serde(default = "default_decider_network_api_endpoint")] - pub network_api_endpoint: String, - #[serde(default = "default_decider_network_id")] - pub network_id: u64, - #[serde(default = "default_matcher_address")] - pub matcher_address: String, - #[serde(default = "default_decider_start_block_hex")] - pub start_block: String, - #[serde(default = "default_decider_worker_gas")] - pub worker_gas: u64, - #[serde(default, skip_serializing)] - pub wallet_key: Option, } impl Default for DeciderConfig { @@ -151,12 +139,6 @@ impl Default for DeciderConfig { decider_period_sec: default_decider_spell_period_sec(), worker_period_sec: default_worker_spell_period_sec(), worker_ipfs_multiaddr: default_ipfs_multiaddr(), - network_api_endpoint: default_decider_network_api_endpoint(), - network_id: default_decider_network_id(), - matcher_address: default_matcher_address(), - start_block: default_decider_start_block_hex(), - worker_gas: default_decider_worker_gas(), - wallet_key: None, } } } diff --git a/crates/subnet-resolver/Cargo.toml b/crates/subnet-resolver/Cargo.toml deleted file mode 100644 index dc41533afe..0000000000 --- a/crates/subnet-resolver/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "subnet-resolver" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -ethabi = { workspace = true } -jsonrpsee = { workspace = true, features = ["http-client", "macros"] } - -libp2p-identity = { workspace = true, features = ["peerid"] } -serde_json = { workspace = true } -thiserror = { workspace = true } -tokio = { workspace = true } -eyre = { workspace = true } -serde = { workspace = true } -hex-utils = { workspace = true } -hex = { workspace = true } -chain-data = { workspace = true } - diff --git a/crates/subnet-resolver/src/error.rs b/crates/subnet-resolver/src/error.rs deleted file mode 100644 index 197eb7187a..0000000000 --- a/crates/subnet-resolver/src/error.rs +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Nox Fluence Peer - * - * Copyright (C) 2024 Fluence DAO - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation version 3 of the - * License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -use chain_data::ChainDataError; -use libp2p_identity::ParseError; -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum ResolveSubnetError { - #[error("error encoding function: '{0}'")] - EncodeFunction(#[from] ethabi::Error), - #[error("error sending jsonrpc request: '{0}'")] - RpcError(#[from] jsonrpsee::core::client::Error), - #[error(transparent)] - ChainData(#[from] ChainDataError), - #[error("getPATs response is empty")] - Empty, - #[error("'{1}' from getPATs is not a valid PeerId")] - InvalidPeerId(#[source] ParseError, &'static str), - #[error("Invalid deal id '{0}': invalid length")] - InvalidDealId(String), -} diff --git a/crates/subnet-resolver/src/lib.rs b/crates/subnet-resolver/src/lib.rs deleted file mode 100644 index 6b211e7610..0000000000 --- a/crates/subnet-resolver/src/lib.rs +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Nox Fluence Peer - * - * Copyright (C) 2024 Fluence DAO - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation version 3 of the - * License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#![feature(try_blocks)] -mod error; -mod resolve; - -pub use resolve::{resolve_subnet, SubnetResolveResult, Worker}; diff --git a/crates/subnet-resolver/src/resolve.rs b/crates/subnet-resolver/src/resolve.rs deleted file mode 100644 index 7b5fd9cd84..0000000000 --- a/crates/subnet-resolver/src/resolve.rs +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Nox Fluence Peer - * - * Copyright (C) 2024 Fluence DAO - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation version 3 of the - * License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -use crate::error::ResolveSubnetError; - -use ethabi::ParamType::{Address, Array, FixedBytes, Tuple, Uint}; -use ethabi::{Function, ParamType, StateMutability, Token}; -use jsonrpsee::core::client::ClientT; -use jsonrpsee::http_client::HttpClientBuilder; -use jsonrpsee::rpc_params; - -use chain_data::{next_opt, parse_peer_id, ChainDataError}; -use hex_utils::decode_hex; -use serde::{Deserialize, Serialize}; -use serde_json::json; - -/// Parse data from chain. Accepts data with and without "0x" prefix. -pub fn parse_chain_data(data: &str) -> Result, ChainDataError> { - if data.is_empty() { - return Err(ChainDataError::Empty); - } - let data = decode_hex(data).map_err(ChainDataError::DecodeHex)?; - let signature: ParamType = Array(Box::new(Tuple(vec![ - // bytes32 id - FixedBytes(32), - // bytes32 workerId - FixedBytes(32), - // bytes32 peerId - FixedBytes(32), - // address provider - Address, - // uint256 joinedEpoch - Uint(256), - ]))); - Ok(ethabi::decode(&[signature], &data)?) -} - -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct Worker { - pub pat_id: String, - pub host_id: String, - pub worker_id: Vec, -} -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct SubnetResolveResult { - pub success: bool, - pub workers: Vec, - pub error: Vec, -} - -fn decode_pats(data: String) -> Result, ResolveSubnetError> { - let tokens = parse_chain_data(&data)?; - let tokens = tokens.into_iter().next().ok_or(ResolveSubnetError::Empty)?; - let tokens = tokens - .into_array() - .ok_or(ChainDataError::InvalidParsedToken("response"))?; - let mut result = vec![]; - for token in tokens { - let tuple = token - .into_tuple() - .ok_or(ChainDataError::InvalidParsedToken("tuple"))?; - let mut tuple = tuple.into_iter(); - - let pat_id = next_opt(&mut tuple, "pat_id", Token::into_fixed_bytes)?; - let pat_id = hex::encode(pat_id); - - let worker_id = next_opt(&mut tuple, "compute_worker_id", Token::into_fixed_bytes)?; - // if all bytes are 0, then worker_id is considered empty - let all_zeros = worker_id.iter().all(|b| *b == 0); - let worker_id = if all_zeros { - vec![] - } else { - let worker_id = parse_peer_id(worker_id) - .map_err(|e| ResolveSubnetError::InvalidPeerId(e, "worker_id"))?; - vec![worker_id.to_string()] - }; - - let peer_id = next_opt(&mut tuple, "compute_peer_id", Token::into_fixed_bytes)?; - let peer_id = parse_peer_id(peer_id) - .map_err(|e| ResolveSubnetError::InvalidPeerId(e, "compute_peer_id"))?; - - let pat = Worker { - pat_id: format!("0x{}", pat_id), - host_id: peer_id.to_string(), - worker_id, - }; - result.push(pat); - } - - Ok(result) -} - -pub fn validate_deal_id(deal_id: String) -> Result { - // 40 hex chars + 2 for "0x" prefix - if deal_id.len() == 42 && deal_id.starts_with("0x") { - Ok(deal_id) - } else if deal_id.len() == 40 { - Ok(format!("0x{}", deal_id)) - } else { - Err(ResolveSubnetError::InvalidDealId(deal_id)) - } -} - -pub async fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResult { - let res: Result<_, ResolveSubnetError> = try { - let deal_id = validate_deal_id(deal_id)?; - // Description of the `getComputeUnits` function from the `chain.workers` smart contract on chain - #[allow(deprecated)] - let input = Function { - name: String::from("getComputeUnits"), - inputs: vec![], - outputs: vec![], - constant: None, - state_mutability: StateMutability::View, - } - .encode_input(&[])?; - let input = format!("0x{}", hex::encode(input)); - let client = HttpClientBuilder::default().build(api_endpoint)?; - let params = rpc_params![json!({ "data": input, "to": deal_id }), json!("latest")]; - let response = client.request("eth_call", params).await; - - let pats = response?; - - decode_pats(pats)? - }; - - match res { - Ok(workers) => SubnetResolveResult { - success: true, - workers, - error: vec![], - }, - Err(err) => SubnetResolveResult { - success: false, - workers: vec![], - error: vec![format!("{}", err)], - }, - } -} diff --git a/crates/system-services/Cargo.toml b/crates/system-services/Cargo.toml index bfb105bf29..8ebfc62084 100644 --- a/crates/system-services/Cargo.toml +++ b/crates/system-services/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] aqua-ipfs-distro = "=0.6.0" -decider-distro = "=0.7.0" +decider-distro = "=0.7.1" registry-distro = "=0.9.4" trust-graph-distro = "=0.4.11" diff --git a/crates/test-utils/Cargo.toml b/crates/test-utils/Cargo.toml index 97cbbe76a7..c5185ae9ae 100644 --- a/crates/test-utils/Cargo.toml +++ b/crates/test-utils/Cargo.toml @@ -17,3 +17,5 @@ maplit = { workspace = true } base64 = { workspace = true } eyre = { workspace = true } cpu-utils = { workspace = true } +clarity = { workspace = true } +server-config = { workspace = true } diff --git a/crates/test-utils/src/utils.rs b/crates/test-utils/src/utils.rs index 82ffcead77..1ba38a393e 100644 --- a/crates/test-utils/src/utils.rs +++ b/crates/test-utils/src/utils.rs @@ -16,8 +16,10 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - +use clarity::PrivateKey; use ivalue_utils::IValue; +use server_config::ChainConfig; +use std::str::FromStr; #[derive(Debug, Clone)] pub struct RetStruct { @@ -63,3 +65,17 @@ pub fn string_result(ret: RetStruct) -> Result { Err(ret.error) } } + +pub fn get_default_chain_config(url: &str) -> ChainConfig { + ChainConfig { + http_endpoint: url.to_string(), + diamond_contract_address: "".to_string(), + network_id: 0, + wallet_key: PrivateKey::from_str( + "0x97a2456e78c4894c62eef6031972d1ca296ed40bf311ab54c231f13db59fc428", + ) + .unwrap(), + default_base_fee: None, + default_priority_fee: None, + } +} diff --git a/crates/types/src/deal_id.rs b/crates/types/src/deal_id.rs index f7f263b485..26a08a1943 100644 --- a/crates/types/src/deal_id.rs +++ b/crates/types/src/deal_id.rs @@ -25,6 +25,12 @@ use std::fmt::Display; pub struct DealId(String); impl DealId { + // TODO: always validate on creation + /// 40 hex chars + 2 for "0x" prefix; Deal ID is EVM contract address; + pub fn is_valid(&self) -> bool { + Self::normalize(&self.0).len() == 40 + } + pub fn normalize(str: &str) -> String { str.trim_start_matches("0x").to_ascii_lowercase() } diff --git a/nox/Cargo.toml b/nox/Cargo.toml index 5da8068775..aaf849a840 100644 --- a/nox/Cargo.toml +++ b/nox/Cargo.toml @@ -16,7 +16,7 @@ connection-pool = { workspace = true } aquamarine = { workspace = true } sorcerer = { workspace = true } health = { workspace = true } -core-distributor = { workspace = true } +core-distributor = { workspace = true, features = ["dummy"] } dhat = { version = "0.3.2", optional = true } serde_json = { workspace = true } fluence-libp2p = { workspace = true } diff --git a/nox/src/node.rs b/nox/src/node.rs index 1dc400f98f..820fbb7083 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -311,7 +311,6 @@ impl Node { let builtins_config = BuiltinsConfig::new( services_config, - config.system_services.decider.network_api_endpoint.clone(), config.dir_config.services_persistent_dir.clone(), config.node_config.allowed_effectors.clone(), config diff --git a/nox/tests/http_expected_config.toml b/nox/tests/http_expected_config.toml index 17883ec59e..cd018b2cf3 100644 --- a/nox/tests/http_expected_config.toml +++ b/nox/tests/http_expected_config.toml @@ -108,11 +108,6 @@ ipfs_binary_path = "/usr/bin/ipfs" decider_period_sec = 10 worker_period_sec = 900 worker_ipfs_multiaddr = "/ip4/127.0.0.1/tcp/5001" -network_api_endpoint = "http://127.0.0.1:8545" -network_id = 31337 -matcher_address = "0x0e1F3B362E22B2Dc82C9E35d6e62998C7E8e2349" -start_block = "earliest" -worker_gas = 210000 [node_config.system_services.registry] registry_period_sec = 3600 diff --git a/particle-builtins/Cargo.toml b/particle-builtins/Cargo.toml index d8f3781163..fba229a193 100644 --- a/particle-builtins/Cargo.toml +++ b/particle-builtins/Cargo.toml @@ -18,7 +18,6 @@ peer-metrics = { workspace = true } uuid-utils = { workspace = true } workers = { workspace = true } service-modules = { workspace = true } -subnet-resolver = { workspace = true } types = { workspace = true } libp2p = { workspace = true } libp2p-kad = { workspace = true } diff --git a/particle-builtins/src/builtins.rs b/particle-builtins/src/builtins.rs index 80db0aac89..3e2ce1bef1 100644 --- a/particle-builtins/src/builtins.rs +++ b/particle-builtins/src/builtins.rs @@ -125,7 +125,6 @@ pub struct Builtins { #[derive(Debug)] pub struct BuiltinsConfig { pub particle_app_services: ParticleAppServicesConfig, - pub connector_api_endpoint: String, /// Dir to store .wasm modules and their configs pub modules_dir: PathBuf, /// Path of the blueprint directory containing blueprints and wasm modules @@ -141,7 +140,6 @@ pub struct BuiltinsConfig { impl BuiltinsConfig { pub fn new( particle_app_services: ParticleAppServicesConfig, - connector_api_endpoint: String, persistent_dir: PathBuf, allowed_effectors: HashMap>, mounted_binaries_mapping: HashMap, @@ -195,7 +193,6 @@ impl BuiltinsConfig { }; Ok(Self { particle_app_services, - connector_api_endpoint, blueprint_dir, modules_dir, allowed_effectors, @@ -397,7 +394,6 @@ where ("vm", "create") => wrap(self.create_vm(args, particle).await), - ("subnet", "resolve") => wrap(self.subnet_resolve(args).await), ("run-console", "print") => { self.guard_protected(&particle).await?; @@ -1220,14 +1216,6 @@ where .map_err(|_| JError::new(format!("Error reading vault file `{path}`"))) } - async fn subnet_resolve(&self, args: Args) -> Result { - let mut args = args.function_args.into_iter(); - let deal_id: String = Args::next("deal_id", &mut args)?; - let result = - subnet_resolver::resolve_subnet(deal_id, &self.config.connector_api_endpoint).await; - Ok(json!(result)) - } - async fn guard_protected(&self, particle: &ParticleParams) -> Result<(), JError> { if self.is_worker_spell(particle).await || self.scopes.is_host(particle.init_peer_id)