From 53017e216c86c6f3be1ac6d7f6bf004a384bbe4d Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Tue, 26 Nov 2024 21:04:07 +0700 Subject: [PATCH 01/13] feat(sdk): ban addresses failed in sdk --- Cargo.lock | 15 +++++ packages/rs-dapi-client/Cargo.toml | 2 + packages/rs-dapi-client/src/address_list.rs | 46 ++++++-------- packages/rs-dapi-client/src/dapi_client.rs | 31 +++------- packages/rs-sdk/src/platform/fetch.rs | 2 +- packages/rs-sdk/src/platform/fetch_many.rs | 4 +- .../rs-sdk/src/platform/fetch_unproved.rs | 5 +- packages/rs-sdk/src/sdk.rs | 10 +--- packages/rs-sdk/src/sync.rs | 60 ++++++++++++++++--- 9 files changed, 102 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 27dd9589d0..1c445a563a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,6 +1212,20 @@ dependencies = [ "serde", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "dashpay-contract" version = "1.6.0-dev.1" @@ -3991,6 +4005,7 @@ dependencies = [ "backon", "chrono", "dapi-grpc", + "dashmap", "futures", "hex", "http-serde", diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index 0f3ad4c88f..08a75c0a06 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -37,5 +37,7 @@ lru = { version = "0.12.3" } serde = { version = "1.0.197", optional = true, features = ["derive"] } serde_json = { version = "1.0.120", optional = true } chrono = { version = "0.4.38", features = ["serde"] } +dashmap = "6.1.0" + [dev-dependencies] tokio = { version = "1.40", features = ["macros"] } diff --git a/packages/rs-dapi-client/src/address_list.rs b/packages/rs-dapi-client/src/address_list.rs index 0c21ecc0b1..000d79e5be 100644 --- a/packages/rs-dapi-client/src/address_list.rs +++ b/packages/rs-dapi-client/src/address_list.rs @@ -3,10 +3,12 @@ use chrono::Utc; use dapi_grpc::tonic::codegen::http; use dapi_grpc::tonic::transport::Uri; +use dashmap::setref::multiple::RefMulti; +use dashmap::DashSet; use rand::{rngs::SmallRng, seq::IteratorRandom, SeedableRng}; -use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; const DEFAULT_BASE_BAN_PERIOD: Duration = Duration::from_secs(60); @@ -103,7 +105,7 @@ pub enum AddressListError { /// for [DapiRequest](crate::DapiRequest) execution. #[derive(Debug, Clone)] pub struct AddressList { - addresses: HashSet
, + addresses: Arc>, base_ban_period: Duration, } @@ -128,14 +130,14 @@ impl AddressList { /// Creates an empty [AddressList] with adjustable base ban time. pub fn with_settings(base_ban_period: Duration) -> Self { AddressList { - addresses: HashSet::new(), + addresses: Arc::new(DashSet::new()), base_ban_period, } } /// Bans address - pub(crate) fn ban_address(&mut self, address: &Address) -> Result<(), AddressListError> { - if !self.addresses.remove(address) { + pub fn ban_address(&self, address: &Address) -> Result<(), AddressListError> { + if self.addresses.remove(address).is_none() { return Err(AddressListError::AddressNotFound(address.uri.clone())); }; @@ -148,8 +150,8 @@ impl AddressList { } /// Clears address' ban record - pub(crate) fn unban_address(&mut self, address: &Address) -> Result<(), AddressListError> { - if !self.addresses.remove(address) { + pub fn unban_address(&self, address: &Address) -> Result<(), AddressListError> { + if self.addresses.remove(address).is_none() { return Err(AddressListError::AddressNotFound(address.uri.clone())); }; @@ -177,29 +179,19 @@ impl AddressList { } /// Randomly select a not banned address. - pub fn get_live_address(&self) -> Option<&Address> { + pub fn get_live_address(&self) -> Option> { let mut rng = SmallRng::from_entropy(); - self.unbanned().into_iter().choose(&mut rng) - } - - /// Get all addresses that are not banned. - fn unbanned(&self) -> Vec<&Address> { let now = chrono::Utc::now(); self.addresses .iter() - .filter(|addr| { + .filter(move |addr| { addr.banned_until .map(|banned_until| banned_until < now) .unwrap_or(true) }) - .collect() - } - - /// Get number of available, not banned addresses. - pub fn available(&self) -> usize { - self.unbanned().len() + .choose(&mut rng) } /// Get number of all addresses, both banned and not banned. @@ -214,6 +206,11 @@ impl AddressList { pub fn is_empty(&self) -> bool { self.addresses.is_empty() } + + /// Get an iterator over all addresses. + pub fn iter(&self) -> impl Iterator> { + self.addresses.iter() + } } // TODO: Must be changed to FromStr @@ -238,12 +235,3 @@ impl FromIterator for AddressList { address_list } } - -impl IntoIterator for AddressList { - type Item = Address; - type IntoIter = std::collections::hash_set::IntoIter
; - - fn into_iter(self) -> Self::IntoIter { - self.addresses.into_iter() - } -} diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 579c62e015..dd7f650e5e 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -5,7 +5,7 @@ use dapi_grpc::mock::Mockable; use dapi_grpc::tonic::async_trait; use std::fmt::Debug; use std::sync::atomic::AtomicUsize; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use tracing::Instrument; @@ -72,7 +72,7 @@ impl Mockable for DapiClientError { /// Access point to DAPI. #[derive(Debug, Clone)] pub struct DapiClient { - address_list: Arc>, + address_list: AddressList, settings: RequestSettings, pool: ConnectionPool, #[cfg(feature = "dump")] @@ -86,7 +86,7 @@ impl DapiClient { let address_count = 3 * address_list.len(); Self { - address_list: Arc::new(RwLock::new(address_list)), + address_list, settings, pool: ConnectionPool::new(address_count), #[cfg(feature = "dump")] @@ -95,7 +95,7 @@ impl DapiClient { } /// Return the [DapiClient] address list. - pub fn address_list(&self) -> &Arc> { + pub fn address_list(&self) -> &AddressList { &self.address_list } } @@ -140,18 +140,13 @@ impl DapiRequestExecutor for DapiClient { let retries_counter = Arc::clone(retries_counter_arc_ref); // Try to get an address to initialize transport on: - let address_list = self + let address_result = self .address_list - .read() - .expect("can't get address list for read"); - - let address_result = address_list .get_live_address() + .as_deref() .cloned() .ok_or(DapiClientError::NoAvailableAddresses); - drop(address_list); - let _span = tracing::trace_span!( "execute request", address = ?address_result, @@ -203,12 +198,7 @@ impl DapiRequestExecutor for DapiClient { Ok(_) => { // Unban the address if it was banned and node responded successfully this time if address.is_banned() { - let mut address_list = self - .address_list - .write() - .expect("can't get address list for write"); - - address_list.unban_address(&address).map_err(|error| { + self.address_list.unban_address(&address).map_err(|error| { ExecutionError { inner: DapiClientError::AddressList(error), retries: retries_counter @@ -223,12 +213,7 @@ impl DapiRequestExecutor for DapiClient { Err(error) => { if error.can_retry() { if applied_settings.ban_failed_address { - let mut address_list = self - .address_list - .write() - .expect("can't get address list for write"); - - address_list.ban_address(&address).map_err(|error| { + self.address_list.ban_address(&address).map_err(|error| { ExecutionError { inner: DapiClientError::AddressList(error), retries: retries_counter diff --git a/packages/rs-sdk/src/platform/fetch.rs b/packages/rs-sdk/src/platform/fetch.rs index 80564fbdf2..7fdf5e1974 100644 --- a/packages/rs-sdk/src/platform/fetch.rs +++ b/packages/rs-sdk/src/platform/fetch.rs @@ -195,7 +195,7 @@ where .dapi_client_settings .override_by(settings.unwrap_or_default()); - retry(settings, fut).await.into_inner() + retry(sdk.address_list(), settings, fut).await.into_inner() } /// Fetch single object from Platform. diff --git a/packages/rs-sdk/src/platform/fetch_many.rs b/packages/rs-sdk/src/platform/fetch_many.rs index 360a3559b3..1fcdb1043a 100644 --- a/packages/rs-sdk/src/platform/fetch_many.rs +++ b/packages/rs-sdk/src/platform/fetch_many.rs @@ -252,7 +252,7 @@ where .dapi_client_settings .override_by(settings.unwrap_or_default()); - retry(settings, fut).await.into_inner() + retry(sdk.address_list(), settings, fut).await.into_inner() } /// Fetch multiple objects from Platform by their identifiers. @@ -327,7 +327,7 @@ impl FetchMany for Document { ) -> Result { let document_query: &DocumentQuery = &query.query(sdk.prove())?; - retry(sdk.dapi_client_settings, |settings| async move { + retry(sdk.address_list(), sdk.dapi_client_settings, |settings| async move { let request = document_query.clone(); let ExecutionResponse { diff --git a/packages/rs-sdk/src/platform/fetch_unproved.rs b/packages/rs-sdk/src/platform/fetch_unproved.rs index ac3a682f81..d98d598844 100644 --- a/packages/rs-sdk/src/platform/fetch_unproved.rs +++ b/packages/rs-sdk/src/platform/fetch_unproved.rs @@ -55,7 +55,6 @@ where /// - `settings`: Request settings for the connection to Platform. /// /// ## Returns - /// Returns: /// * `Ok(Some(Self))` when object is found. /// * `Ok(None)` when object is not found. /// * [`Err(Error)`](Error) when an error occurs. @@ -107,7 +106,9 @@ where }; let settings = sdk.dapi_client_settings.override_by(settings); - retry(settings, closure).await.into_inner() + retry(sdk.address_list(), settings, closure) + .await + .into_inner() } } diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index 3fd570e206..a89086d39e 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -554,15 +554,9 @@ impl Sdk { } /// Return the [DapiClient] address list - pub fn address_list(&self) -> Result { + pub fn address_list(&self) -> &AddressList { match &self.inner { - SdkInstance::Dapi { dapi, version: _ } => { - let address_list_arc = dapi.address_list(); - let address_list_lock = address_list_arc - .read() - .map_err(|e| format!("Failed to read address list: {e}"))?; - Ok(address_list_lock.clone()) - } + SdkInstance::Dapi { dapi, .. } => dapi.address_list(), #[cfg(feature = "mocks")] SdkInstance::Mock { .. } => { unimplemented!("mock Sdk does not have address list") diff --git a/packages/rs-sdk/src/sync.rs b/packages/rs-sdk/src/sync.rs index 38a878e174..a2e4b00f1d 100644 --- a/packages/rs-sdk/src/sync.rs +++ b/packages/rs-sdk/src/sync.rs @@ -4,15 +4,17 @@ //! inside a tokio runtime. This module spawns async futures in active tokio runtime, and retrieves the result //! using a channel. +use crate::Error; use arc_swap::ArcSwap; use drive_proof_verifier::error::ContextProviderError; -use rs_dapi_client::{CanRetry, ExecutionResult, RequestSettings}; +use rs_dapi_client::{AddressList, CanRetry, ExecutionError, ExecutionResult, RequestSettings}; use std::{ fmt::Debug, future::Future, sync::{mpsc::SendError, Arc}, }; use tokio::{runtime::TryCurrentError, sync::Mutex}; + #[derive(Debug, thiserror::Error)] pub enum AsyncError { /// Not running inside tokio runtime @@ -138,8 +140,9 @@ async fn worker( /// } /// #[tokio::main] /// async fn main() { +/// let address_list = rs_dapi_client::AddressList::default(); /// let global_settings = RequestSettings::default(); -/// dash_sdk::sync::retry(global_settings, retry_test_function).await.expect_err("should fail"); +/// dash_sdk::sync::retry(&address_list, global_settings, retry_test_function).await.expect_err("should fail"); /// } /// ``` /// @@ -154,6 +157,7 @@ async fn worker( /// /// - [`::backon`] crate that is used by this function. pub async fn retry( + address_list: &AddressList, settings: RequestSettings, future_factory_fn: FutureFactoryFn, ) -> ExecutionResult @@ -187,21 +191,59 @@ where async move { let settings = closure_settings.load_full().clone(); let mut func = inner_fn.lock().await; - (*func)(*settings).await + let result = (*func)(*settings).await; + + match &result { + Ok(response) => { + // Unban the address if it was banned and node responded successfully this time + if response.address.is_banned() { + address_list + .unban_address(&response.address) + .map_err(|error| ExecutionError { + inner: Error::from(rs_dapi_client::DapiClientError::AddressList( + error, + )), + retries: response.retries, + address: Some(response.address.clone()), + })?; + } + } + Err(error) => { + if error.can_retry() { + if let Some(address) = &error.address { + if settings.ban_failed_address.unwrap_or(true) { + address_list.ban_address(address).map_err( + |address_list_error| ExecutionError { + inner: Error::from( + rs_dapi_client::DapiClientError::AddressList( + address_list_error, + ), + ), + retries: error.retries, + address: Some(address.clone()), + }, + )?; + } + } + } + } + }; + + result } }; - let result= ::backon::Retryable::retry(closure,backoff_strategy) + let result = ::backon::Retryable::retry(closure, backoff_strategy) .when(|e| { if e.can_retry() { - // requests sent for current execution attempt; + // requests sent for current execution attempt; let requests_sent = e.retries + 1; - // requests sent in all preceeding attempts; user expects `settings.retries +1` + // requests sent in all preceeding attempts; user expects `settings.retries +1` retries += requests_sent; let all_requests_sent = retries; - if all_requests_sent <=max_retries { // we account for for initial request + if all_requests_sent <= max_retries { // we account for initial request tracing::warn!(retry = all_requests_sent, max_retries, error=?e, "retrying request"); let new_settings = RequestSettings { retries: Some(max_retries - all_requests_sent), // limit num of retries for lower layer @@ -352,6 +394,8 @@ mod test { for _ in 0..1 { let counter = Arc::new(AtomicUsize::new(0)); + let address_list = AddressList::default(); + // we retry 5 times, and expect 5 retries + 1 initial request let mut global_settings = RequestSettings::default(); global_settings.retries = Some(expected_requests - 1); @@ -361,7 +405,7 @@ mod test { retry_test_function(s, counter) }; - retry(global_settings, closure) + retry(&address_list, global_settings, closure) .await .expect_err("should fail"); From 41e2d9292c429fdc186bbccb3e0a0d6c89678209 Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Tue, 26 Nov 2024 22:37:23 +0700 Subject: [PATCH 02/13] chore: do not fail if address is not in the list --- packages/rs-dapi-client/src/dapi_client.rs | 60 ++++++++++----- packages/rs-drive-proof-verifier/src/error.rs | 1 - .../src/platform/transition/broadcast.rs | 4 +- packages/rs-sdk/src/sync.rs | 75 +++++++++++++------ 4 files changed, 96 insertions(+), 44 deletions(-) diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 27c00ce0b4..f9ff4f5f62 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -198,14 +198,26 @@ impl DapiRequestExecutor for DapiClient { Ok(_) => { // Unban the address if it was banned and node responded successfully this time if address.is_banned() { - self.address_list.unban_address(&address).map_err(|error| { - ExecutionError { - inner: DapiClientError::AddressList(error), - retries: retries_counter - .load(std::sync::atomic::Ordering::Acquire), - address: Some(address.clone()), + match self.address_list.unban_address(&address) { + Ok(_) => { + tracing::debug!( + ?address, + "unban successfully responded address {}", + address + ); } - })?; + // The address might be already removed from the list + // by background process (i.e., SML update), and it's fine. + Err(AddressListError::AddressNotFound(_)) => { + tracing::debug!( + ?address, + "unable to unban address {address}. it's not in the list" + ); + } + Err(AddressListError::InvalidAddressUri(_)) => { + unreachable!("unban address doesn't return InvalidAddressUri") + } + } } tracing::trace!(?response, "received {} response", response_name); @@ -213,19 +225,29 @@ impl DapiRequestExecutor for DapiClient { Err(error) => { if error.can_retry() { if applied_settings.ban_failed_address { - tracing::warn!( - ?address, - ?error, - "received server error, banning address" - ); - self.address_list.ban_address(&address).map_err(|error| { - ExecutionError { - inner: DapiClientError::AddressList(error), - retries: retries_counter - .load(std::sync::atomic::Ordering::Acquire), - address: Some(address.clone()), + match self.address_list.ban_address(&address) { + Ok(_) => { + tracing::warn!( + ?address, + ?error, + "received server error, banning address {address}" + ); + } + // The address might be already removed from the list + // by background process (i.e., SML update), and it's fine. + Err(AddressListError::AddressNotFound(_)) => { + tracing::debug!( + ?address, + ?error, + "unable to unban address {address}. it's not in the list" + ); } - })?; + Err(AddressListError::InvalidAddressUri(_)) => { + unreachable!( + "unban address doesn't return InvalidAddressUri" + ) + } + } } else { tracing::debug!( ?address, diff --git a/packages/rs-drive-proof-verifier/src/error.rs b/packages/rs-drive-proof-verifier/src/error.rs index 8c0664c825..3fb5825a8c 100644 --- a/packages/rs-drive-proof-verifier/src/error.rs +++ b/packages/rs-drive-proof-verifier/src/error.rs @@ -1,5 +1,4 @@ use dpp::ProtocolError; -use drive::grovedb::operations::proof::GroveDBProof; /// Errors #[derive(Debug, thiserror::Error)] diff --git a/packages/rs-sdk/src/platform/transition/broadcast.rs b/packages/rs-sdk/src/platform/transition/broadcast.rs index f41a279b13..f7c3f75d32 100644 --- a/packages/rs-sdk/src/platform/transition/broadcast.rs +++ b/packages/rs-sdk/src/platform/transition/broadcast.rs @@ -52,7 +52,7 @@ impl BroadcastStateTransition for StateTransition { }; // response is empty for a broadcast, result comes from the stream wait for state transition result - retry(retry_settings, factory) + retry(sdk.address_list(), retry_settings, factory) .await .into_inner() .map(|_| ()) @@ -122,7 +122,7 @@ impl BroadcastStateTransition for StateTransition { .wrap_to_execution_result(&response) }; - let future = retry(retry_settings, factory); + let future = retry(sdk.address_list(), retry_settings, factory); // run the future with or without timeout, depending on the settings let wait_timeout = settings.and_then(|s| s.wait_timeout); match wait_timeout { diff --git a/packages/rs-sdk/src/sync.rs b/packages/rs-sdk/src/sync.rs index a2e4b00f1d..d887e8b893 100644 --- a/packages/rs-sdk/src/sync.rs +++ b/packages/rs-sdk/src/sync.rs @@ -4,10 +4,9 @@ //! inside a tokio runtime. This module spawns async futures in active tokio runtime, and retrieves the result //! using a channel. -use crate::Error; use arc_swap::ArcSwap; use drive_proof_verifier::error::ContextProviderError; -use rs_dapi_client::{AddressList, CanRetry, ExecutionError, ExecutionResult, RequestSettings}; +use rs_dapi_client::{AddressList, AddressListError, CanRetry, ExecutionResult, RequestSettings}; use std::{ fmt::Debug, future::Future, @@ -193,36 +192,68 @@ where let mut func = inner_fn.lock().await; let result = (*func)(*settings).await; + // Ban or unban the address based on the result match &result { Ok(response) => { // Unban the address if it was banned and node responded successfully this time if response.address.is_banned() { - address_list - .unban_address(&response.address) - .map_err(|error| ExecutionError { - inner: Error::from(rs_dapi_client::DapiClientError::AddressList( - error, - )), - retries: response.retries, - address: Some(response.address.clone()), - })?; + match address_list.unban_address(&response.address) { + Ok(_) => { + tracing::debug!( + address = ?response.address, + "unban successfully responded address {}", + response.address, + ) + } + // The address might be already removed from the list + // by background process (i.e., SML update), and it's fine. + Err(AddressListError::AddressNotFound(_)) => { + tracing::debug!( + address = ?response.address, + "unable to unban address {}. it's not in the list", + response.address + ); + } + Err(AddressListError::InvalidAddressUri(_)) => { + unreachable!("unban address doesn't return InvalidAddressUri") + } + } } } Err(error) => { + // Ban address if it failed and can be retried if error.can_retry() { + // Address must be present if let Some(address) = &error.address { + // And ban logic must be enabled for this request if settings.ban_failed_address.unwrap_or(true) { - address_list.ban_address(address).map_err( - |address_list_error| ExecutionError { - inner: Error::from( - rs_dapi_client::DapiClientError::AddressList( - address_list_error, - ), - ), - retries: error.retries, - address: Some(address.clone()), - }, - )?; + match address_list.ban_address(address) { + Ok(_) => { + tracing::warn!( + ?address, + ?error, + "received server error, banning address {address}" + ); + } + // The address might be already removed from the list + // by background process (i.e., SML update), and it's fine. + Err(AddressListError::AddressNotFound(_)) => { + tracing::debug!( + ?address, + ?error, + "unable to ban address {address}. it's not in the list" + ); + } + Err(AddressListError::InvalidAddressUri(_)) => { + unreachable!("ban address doesn't return InvalidAddressUri") + } + } + } else { + tracing::debug!( + ?address, + ?error, + "received server error, we should ban the node but banning is disabled" + ); } } } From 54c8fc43909c3f30c6cbc85effaa26002d8d48bb Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Tue, 26 Nov 2024 22:50:59 +0700 Subject: [PATCH 03/13] refactor: remove unnecessary move --- packages/rs-dapi-client/src/address_list.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/rs-dapi-client/src/address_list.rs b/packages/rs-dapi-client/src/address_list.rs index 000d79e5be..2e60e32e0a 100644 --- a/packages/rs-dapi-client/src/address_list.rs +++ b/packages/rs-dapi-client/src/address_list.rs @@ -186,7 +186,7 @@ impl AddressList { self.addresses .iter() - .filter(move |addr| { + .filter(|addr| { addr.banned_until .map(|banned_until| banned_until < now) .unwrap_or(true) From 5aadfd0decd8088267b0a2f1115a916a2320d9ab Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Tue, 26 Nov 2024 23:02:44 +0700 Subject: [PATCH 04/13] test: fix iterator in evonode test --- packages/rs-sdk/tests/fetch/evonode.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/rs-sdk/tests/fetch/evonode.rs b/packages/rs-sdk/tests/fetch/evonode.rs index 0d35d5be9f..32ad4ee25b 100644 --- a/packages/rs-sdk/tests/fetch/evonode.rs +++ b/packages/rs-sdk/tests/fetch/evonode.rs @@ -5,6 +5,7 @@ use dash_sdk::platform::{types::evonode::EvoNode, FetchUnproved}; use dpp::dashcore::{hashes::Hash, ProTxHash}; use drive_proof_verifier::types::EvoNodeStatus; use http::Uri; +use std::ops::Deref; use std::time::Duration; /// Given some existing evonode URIs, WHEN we connect to them, THEN we get status. use tokio::time::timeout; @@ -18,7 +19,8 @@ async fn test_evonode_status() { let addresses = cfg.address_list(); - for address in addresses { + for address in addresses.iter() { + let address = address.deref(); let node = EvoNode::new(address.clone()); match timeout( Duration::from_secs(3), From 945ddf67f38cf70038aa72edb23a49fd288b8f9f Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Wed, 27 Nov 2024 20:19:42 +0700 Subject: [PATCH 05/13] refactor: reuse ban function from dapi client --- packages/rs-dapi-client/src/dapi_client.rs | 182 ++++++++++++--------- packages/rs-dapi-client/src/lib.rs | 2 +- packages/rs-sdk/src/sync.rs | 75 +-------- packages/rs-sdk/tests/fetch/evonode.rs | 5 +- 4 files changed, 112 insertions(+), 152 deletions(-) diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index f9ff4f5f62..d399de4686 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -3,7 +3,7 @@ use backon::{ConstantBuilder, Retryable}; use dapi_grpc::mock::Mockable; use dapi_grpc::tonic::async_trait; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; @@ -11,6 +11,7 @@ use tracing::Instrument; use crate::address_list::AddressListError; use crate::connection_pool::ConnectionPool; +use crate::request_settings::AppliedRequestSettings; use crate::transport::TransportError; use crate::{ transport::{TransportClient, TransportRequest}, @@ -100,6 +101,81 @@ impl DapiClient { } } +/// Ban address in case of retryable error or unban it +/// if it was banned, and the request was successful. +pub fn ban_failed_address( + address_list: &AddressList, + result: &ExecutionResult, + applied_settings: &AppliedRequestSettings, +) where + E: CanRetry + Display + Debug, +{ + match &result { + Ok(response) => { + // Unban the address if it was banned and node responded successfully this time + if response.address.is_banned() { + match address_list.unban_address(&response.address) { + Ok(_) => { + tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address); + } + // The address might be already removed from the list + // by background process (i.e., SML update), and it's fine. + Err(AddressListError::AddressNotFound(_)) => { + tracing::debug!( + address = ?response.address, + "unable to unban address {} because it's not in the list anymore", + response.address + ); + } + Err(AddressListError::InvalidAddressUri(_)) => { + unreachable!("unban address doesn't return InvalidAddressUri") + } + } + } + } + Err(error) => { + if error.can_retry() { + if let Some(address) = error.address.as_ref() { + if applied_settings.ban_failed_address { + match address_list.ban_address(address) { + Ok(_) => { + tracing::warn!( + ?address, + ?error, + "ban address {address} due to error: {error}" + ); + } + // The address might be already removed from the list + // by background process (i.e., SML update), and it's fine. + Err(AddressListError::AddressNotFound(_)) => { + tracing::debug!( + ?address, + ?error, + "unable to ban address {address} because it's not in the list anymore" + ); + } + Err(AddressListError::InvalidAddressUri(_)) => { + unreachable!("ban address doesn't return InvalidAddressUri") + } + } + } else { + tracing::debug!( + ?error, + ?address, + "we should ban the address {address} due to the error but banning is disabled" + ); + } + } else { + tracing::debug!( + ?error, + "we should ban an address due to the error but address is absent" + ); + } + } + } + }; +} + #[async_trait] impl DapiRequestExecutor for DapiClient { /// Execute the [DapiRequest](crate::DapiRequest). @@ -189,94 +265,40 @@ impl DapiRequestExecutor for DapiClient { address: Some(address.clone()), })?; - let response = transport_request + let result = transport_request .execute_transport(&mut transport_client, &applied_settings) .await .map_err(DapiClientError::Transport); - match &response { - Ok(_) => { - // Unban the address if it was banned and node responded successfully this time - if address.is_banned() { - match self.address_list.unban_address(&address) { - Ok(_) => { - tracing::debug!( - ?address, - "unban successfully responded address {}", - address - ); - } - // The address might be already removed from the list - // by background process (i.e., SML update), and it's fine. - Err(AddressListError::AddressNotFound(_)) => { - tracing::debug!( - ?address, - "unable to unban address {address}. it's not in the list" - ); - } - Err(AddressListError::InvalidAddressUri(_)) => { - unreachable!("unban address doesn't return InvalidAddressUri") - } - } + let retries = retries_counter.load(std::sync::atomic::Ordering::Acquire); + + let execution_result = result + .map(|inner| { + tracing::trace!(response = ?inner, "received {} response", response_name); + + ExecutionResponse { + inner, + retries, + address: address.clone(), } + }) + .map_err(|inner| { + tracing::debug!(error = ?inner, "received error: {inner}"); - tracing::trace!(?response, "received {} response", response_name); - } - Err(error) => { - if error.can_retry() { - if applied_settings.ban_failed_address { - match self.address_list.ban_address(&address) { - Ok(_) => { - tracing::warn!( - ?address, - ?error, - "received server error, banning address {address}" - ); - } - // The address might be already removed from the list - // by background process (i.e., SML update), and it's fine. - Err(AddressListError::AddressNotFound(_)) => { - tracing::debug!( - ?address, - ?error, - "unable to unban address {address}. it's not in the list" - ); - } - Err(AddressListError::InvalidAddressUri(_)) => { - unreachable!( - "unban address doesn't return InvalidAddressUri" - ) - } - } - } else { - tracing::debug!( - ?address, - ?error, - "received server error, we should ban the node but banning is disabled" - ); - } - } else { - tracing::debug!( - ?error, - "received server error, most likely the request is invalid" - ); + ExecutionError { + inner, + retries, + address: Some(address.clone()), } - } - }; + }); - let retries = retries_counter.load(std::sync::atomic::Ordering::Acquire); + ban_failed_address::( + &self.address_list, + &execution_result, + &applied_settings, + ); - response - .map(|inner| ExecutionResponse { - inner, - retries, - address: address.clone(), - }) - .map_err(|inner| ExecutionError { - inner, - retries, - address: Some(address), - }) + execution_result } }; diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index f8c03f3956..4e70aa5f23 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -17,7 +17,7 @@ pub use address_list::Address; pub use address_list::AddressList; pub use address_list::AddressListError; pub use connection_pool::ConnectionPool; -pub use dapi_client::{DapiClient, DapiClientError}; +pub use dapi_client::{ban_failed_address, DapiClient, DapiClientError}; #[cfg(feature = "dump")] pub use dump::DumpData; pub use executor::{ diff --git a/packages/rs-sdk/src/sync.rs b/packages/rs-sdk/src/sync.rs index d887e8b893..a42f3cc011 100644 --- a/packages/rs-sdk/src/sync.rs +++ b/packages/rs-sdk/src/sync.rs @@ -6,7 +6,8 @@ use arc_swap::ArcSwap; use drive_proof_verifier::error::ContextProviderError; -use rs_dapi_client::{AddressList, AddressListError, CanRetry, ExecutionResult, RequestSettings}; +use rs_dapi_client::{ban_failed_address, AddressList, CanRetry, ExecutionResult, RequestSettings}; +use std::fmt::Display; use std::{ fmt::Debug, future::Future, @@ -163,7 +164,7 @@ pub async fn retry( where Fut: Future>, FutureFactoryFn: FnMut(RequestSettings) -> Fut, - E: CanRetry + Debug, + E: CanRetry + Display + Debug, { let max_retries = settings.retries.unwrap_or_default(); @@ -193,72 +194,7 @@ where let result = (*func)(*settings).await; // Ban or unban the address based on the result - match &result { - Ok(response) => { - // Unban the address if it was banned and node responded successfully this time - if response.address.is_banned() { - match address_list.unban_address(&response.address) { - Ok(_) => { - tracing::debug!( - address = ?response.address, - "unban successfully responded address {}", - response.address, - ) - } - // The address might be already removed from the list - // by background process (i.e., SML update), and it's fine. - Err(AddressListError::AddressNotFound(_)) => { - tracing::debug!( - address = ?response.address, - "unable to unban address {}. it's not in the list", - response.address - ); - } - Err(AddressListError::InvalidAddressUri(_)) => { - unreachable!("unban address doesn't return InvalidAddressUri") - } - } - } - } - Err(error) => { - // Ban address if it failed and can be retried - if error.can_retry() { - // Address must be present - if let Some(address) = &error.address { - // And ban logic must be enabled for this request - if settings.ban_failed_address.unwrap_or(true) { - match address_list.ban_address(address) { - Ok(_) => { - tracing::warn!( - ?address, - ?error, - "received server error, banning address {address}" - ); - } - // The address might be already removed from the list - // by background process (i.e., SML update), and it's fine. - Err(AddressListError::AddressNotFound(_)) => { - tracing::debug!( - ?address, - ?error, - "unable to ban address {address}. it's not in the list" - ); - } - Err(AddressListError::InvalidAddressUri(_)) => { - unreachable!("ban address doesn't return InvalidAddressUri") - } - } - } else { - tracing::debug!( - ?address, - ?error, - "received server error, we should ban the node but banning is disabled" - ); - } - } - } - } - }; + ban_failed_address(address_list, &result, &settings.finalize()); result } @@ -304,6 +240,7 @@ where #[cfg(test)] mod test { use super::*; + use derive_more::Display; use http::Uri; use rs_dapi_client::ExecutionError; use std::{ @@ -387,7 +324,7 @@ mod test { } } - #[derive(Debug)] + #[derive(Debug, Display)] enum MockError { Generic, } diff --git a/packages/rs-sdk/tests/fetch/evonode.rs b/packages/rs-sdk/tests/fetch/evonode.rs index 32ad4ee25b..8f079c0f8d 100644 --- a/packages/rs-sdk/tests/fetch/evonode.rs +++ b/packages/rs-sdk/tests/fetch/evonode.rs @@ -35,8 +35,9 @@ async fn test_evonode_status() { status.chain.latest_block_height > 0, "latest block height must be positive" ); - assert!( - status.node.pro_tx_hash.unwrap_or_default().len() == ProTxHash::LEN, + assert_eq!( + status.node.pro_tx_hash.unwrap_or_default().len(), + ProTxHash::LEN, "latest block hash must be non-empty" ); // Add more specific assertions based on expected status properties From db7ce774a86b0feec61905f9ba262abcf42c19fc Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Thu, 28 Nov 2024 20:36:35 +0700 Subject: [PATCH 06/13] refactor: switch back to RwLock --- Cargo.lock | 15 -- packages/rs-dapi-client/Cargo.toml | 1 - packages/rs-dapi-client/src/address_list.rs | 150 ++++++++++++-------- packages/rs-dapi-client/src/dapi_client.rs | 56 +++----- packages/rs-dapi-client/src/lib.rs | 1 + scripts/configure_test_suite_network.sh | 2 +- 6 files changed, 117 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c445a563a..27dd9589d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,20 +1212,6 @@ dependencies = [ "serde", ] -[[package]] -name = "dashmap" -version = "6.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" -dependencies = [ - "cfg-if", - "crossbeam-utils", - "hashbrown 0.14.5", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "dashpay-contract" version = "1.6.0-dev.1" @@ -4005,7 +3991,6 @@ dependencies = [ "backon", "chrono", "dapi-grpc", - "dashmap", "futures", "hex", "http-serde", diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index 08a75c0a06..8113ab013d 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -37,7 +37,6 @@ lru = { version = "0.12.3" } serde = { version = "1.0.197", optional = true, features = ["derive"] } serde_json = { version = "1.0.120", optional = true } chrono = { version = "0.4.38", features = ["serde"] } -dashmap = "6.1.0" [dev-dependencies] tokio = { version = "1.40", features = ["macros"] } diff --git a/packages/rs-dapi-client/src/address_list.rs b/packages/rs-dapi-client/src/address_list.rs index 2e60e32e0a..8b18af8ea5 100644 --- a/packages/rs-dapi-client/src/address_list.rs +++ b/packages/rs-dapi-client/src/address_list.rs @@ -3,12 +3,13 @@ use chrono::Utc; use dapi_grpc::tonic::codegen::http; use dapi_grpc::tonic::transport::Uri; -use dashmap::setref::multiple::RefMulti; -use dashmap::DashSet; use rand::{rngs::SmallRng, seq::IteratorRandom, SeedableRng}; +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::hash::{Hash, Hasher}; +use std::mem; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; const DEFAULT_BASE_BAN_PERIOD: Duration = Duration::from_secs(60); @@ -16,12 +17,7 @@ const DEFAULT_BASE_BAN_PERIOD: Duration = Duration::from_secs(60); /// DAPI address. #[derive(Debug, Clone, Eq)] #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] -pub struct Address { - ban_count: usize, - banned_until: Option>, - #[cfg_attr(feature = "mocks", serde(with = "http_serde::uri"))] - uri: Uri, -} +pub struct Address(#[cfg_attr(feature = "mocks", serde(with = "http_serde::uri"))] Uri); impl FromStr for Address { type Err = AddressListError; @@ -35,35 +31,46 @@ impl FromStr for Address { impl PartialEq for Address { fn eq(&self, other: &Self) -> bool { - self.uri == other.uri + self.0 == other.0 } } impl PartialEq for Address { fn eq(&self, other: &Uri) -> bool { - self.uri == *other + self.0 == *other } } impl Hash for Address { fn hash(&self, state: &mut H) { - self.uri.hash(state); + self.0.hash(state); } } impl From for Address { fn from(uri: Uri) -> Self { - Address { - ban_count: 0, - banned_until: None, - uri, - } + Address(uri) } } impl Address { + /// Get [Uri] of a node. + pub fn uri(&self) -> &Uri { + &self.0 + } +} + +/// Address status +/// Contains information about the number of bans and the time until the next ban is lifted. +#[derive(Debug, Default, Clone)] +pub struct AddressStatus { + ban_count: usize, + banned_until: Option>, +} + +impl AddressStatus { /// Ban the [Address] so it won't be available through [AddressList::get_live_address] for some time. - fn ban(&mut self, base_ban_period: &Duration) { + pub fn ban(&mut self, base_ban_period: &Duration) { let coefficient = (self.ban_count as f64).exp(); let ban_period = Duration::from_secs_f64(base_ban_period.as_secs_f64() * coefficient); @@ -77,24 +84,16 @@ impl Address { } /// Clears ban record. - fn unban(&mut self) { + pub fn unban(&mut self) { self.ban_count = 0; self.banned_until = None; } - - /// Get [Uri] of a node. - pub fn uri(&self) -> &Uri { - &self.uri - } } /// [AddressList] errors #[derive(Debug, thiserror::Error)] #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] pub enum AddressListError { - /// Specified address is not present in the list - #[error("address {0} not found in the list")] - AddressNotFound(#[cfg_attr(feature = "mocks", serde(with = "http_serde::uri"))] Uri), /// A valid uri is required to create an Address #[error("unable parse address: {0}")] #[cfg_attr(feature = "mocks", serde(skip))] @@ -105,7 +104,7 @@ pub enum AddressListError { /// for [DapiRequest](crate::DapiRequest) execution. #[derive(Debug, Clone)] pub struct AddressList { - addresses: Arc>, + addresses: Arc>>, base_ban_period: Duration, } @@ -117,7 +116,7 @@ impl Default for AddressList { impl std::fmt::Display for Address { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.uri.fmt(f) + self.0.fmt(f) } } @@ -130,43 +129,70 @@ impl AddressList { /// Creates an empty [AddressList] with adjustable base ban time. pub fn with_settings(base_ban_period: Duration) -> Self { AddressList { - addresses: Arc::new(DashSet::new()), + addresses: Arc::new(RwLock::new(HashMap::new())), base_ban_period, } } /// Bans address - pub fn ban_address(&self, address: &Address) -> Result<(), AddressListError> { - if self.addresses.remove(address).is_none() { - return Err(AddressListError::AddressNotFound(address.uri.clone())); - }; + /// Returns false if the address is not in the list. + pub fn ban(&self, address: &Address) -> bool { + let mut guard = self.addresses.write().unwrap(); - let mut banned_address = address.clone(); - banned_address.ban(&self.base_ban_period); + let Some(mut status) = guard.get_mut(address) else { + return false; + }; - self.addresses.insert(banned_address); + status.ban(&self.base_ban_period); - Ok(()) + true } /// Clears address' ban record - pub fn unban_address(&self, address: &Address) -> Result<(), AddressListError> { - if self.addresses.remove(address).is_none() { - return Err(AddressListError::AddressNotFound(address.uri.clone())); + /// Returns false if the address is not in the list. + pub fn unban(&self, address: &Address) -> bool { + let mut guard = self.addresses.write().unwrap(); + + let Some(mut status) = guard.get_mut(address) else { + return false; }; - let mut unbanned_address = address.clone(); - unbanned_address.unban(); + status.unban(); + + true + } - self.addresses.insert(unbanned_address); + /// Check if the address is banned. + pub fn is_banned(&self, address: &Address) -> bool { + let guard = self.addresses.read().unwrap(); - Ok(()) + guard + .get(address) + .map(|status| status.is_banned()) + .unwrap_or(false) } /// Adds a node [Address] to [AddressList] /// Returns false if the address is already in the list. pub fn add(&mut self, address: Address) -> bool { - self.addresses.insert(address) + let mut guard = self.addresses.write().unwrap(); + + match guard.entry(address) { + Entry::Occupied(_) => false, + Entry::Vacant(e) => { + e.insert(AddressStatus::default()); + + true + } + } + } + + /// Remove address from the list + /// Returns [AddressStatus] if the address was in the list. + pub fn remove(&mut self, address: &Address) -> Option { + let mut guard = self.addresses.write().unwrap(); + + guard.remove(address) } // TODO: this is the most simple way to add an address @@ -175,28 +201,32 @@ impl AddressList { /// Add a node [Address] to [AddressList] by [Uri]. /// Returns false if the address is already in the list. pub fn add_uri(&mut self, uri: Uri) -> bool { - self.addresses.insert(uri.into()) + self.add(Address::from(uri)) } /// Randomly select a not banned address. - pub fn get_live_address(&self) -> Option> { + pub fn get_live_address(&self) -> Option
{ + let mut guard = self.addresses.read().unwrap(); + let mut rng = SmallRng::from_entropy(); let now = chrono::Utc::now(); - self.addresses + guard .iter() - .filter(|addr| { - addr.banned_until + .filter(|(addr, status)| { + status + .banned_until .map(|banned_until| banned_until < now) .unwrap_or(true) }) .choose(&mut rng) + .map(|(addr, _)| addr.clone()) } /// Get number of all addresses, both banned and not banned. pub fn len(&self) -> usize { - self.addresses.len() + self.addresses.read().unwrap().len() } /// Check if the list is empty. @@ -204,12 +234,20 @@ impl AddressList { /// Returns false if there is at least one address in the list. /// Banned addresses are also counted. pub fn is_empty(&self) -> bool { - self.addresses.is_empty() + self.addresses.read().unwrap().is_empty() } +} + +impl IntoIterator for AddressList { + type Item = (Address, AddressStatus); + type IntoIter = std::collections::hash_map::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + let mut guard = self.addresses.write().unwrap(); + + let addresses_map = mem::take(&mut *guard); - /// Get an iterator over all addresses. - pub fn iter(&self) -> impl Iterator> { - self.addresses.iter() + addresses_map.into_iter() } } diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index d399de4686..e459ef6de4 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -113,23 +113,17 @@ pub fn ban_failed_address( match &result { Ok(response) => { // Unban the address if it was banned and node responded successfully this time - if response.address.is_banned() { - match address_list.unban_address(&response.address) { - Ok(_) => { - tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address); - } + if address_list.is_banned(&response.address) { + if address_list.unban(&response.address) { + tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address); + } else { // The address might be already removed from the list // by background process (i.e., SML update), and it's fine. - Err(AddressListError::AddressNotFound(_)) => { - tracing::debug!( - address = ?response.address, - "unable to unban address {} because it's not in the list anymore", - response.address - ); - } - Err(AddressListError::InvalidAddressUri(_)) => { - unreachable!("unban address doesn't return InvalidAddressUri") - } + tracing::debug!( + address = ?response.address, + "unable to unban address {} because it's not in the list anymore", + response.address + ); } } } @@ -137,26 +131,20 @@ pub fn ban_failed_address( if error.can_retry() { if let Some(address) = error.address.as_ref() { if applied_settings.ban_failed_address { - match address_list.ban_address(address) { - Ok(_) => { - tracing::warn!( - ?address, - ?error, - "ban address {address} due to error: {error}" - ); - } + if address_list.ban(address) { + tracing::warn!( + ?address, + ?error, + "ban address {address} due to error: {error}" + ); + } else { // The address might be already removed from the list // by background process (i.e., SML update), and it's fine. - Err(AddressListError::AddressNotFound(_)) => { - tracing::debug!( - ?address, - ?error, - "unable to ban address {address} because it's not in the list anymore" - ); - } - Err(AddressListError::InvalidAddressUri(_)) => { - unreachable!("ban address doesn't return InvalidAddressUri") - } + tracing::debug!( + ?address, + ?error, + "unable to ban address {address} because it's not in the list anymore" + ); } } else { tracing::debug!( @@ -219,8 +207,6 @@ impl DapiRequestExecutor for DapiClient { let address_result = self .address_list .get_live_address() - .as_deref() - .cloned() .ok_or(DapiClientError::NoAvailableAddresses); let _span = tracing::trace_span!( diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index 4e70aa5f23..c660993804 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -16,6 +16,7 @@ pub mod transport; pub use address_list::Address; pub use address_list::AddressList; pub use address_list::AddressListError; +pub use address_list::AddressStatus; pub use connection_pool::ConnectionPool; pub use dapi_client::{ban_failed_address, DapiClient, DapiClientError}; #[cfg(feature = "dump")] diff --git a/scripts/configure_test_suite_network.sh b/scripts/configure_test_suite_network.sh index 54e6f99349..498e9d2d03 100755 --- a/scripts/configure_test_suite_network.sh +++ b/scripts/configure_test_suite_network.sh @@ -66,7 +66,7 @@ else CERT_FLAG="" ST_EXECUTION_INTERVAL=15000 fi -SKIP_SYNC_BEFORE_HEIGHT=$(curl -s $INSIGHT_URL | jq '.height - 200') +SKIP_SYNC_BEFORE_HEIGHT=4800 # $(curl -s $INSIGHT_URL | jq '.height - 200') # check variables are not empty if [ -z "$FAUCET_ADDRESS" ] || \ From d1dca098bcfe2996f53891f81b444ce747c2f661 Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Thu, 28 Nov 2024 21:17:19 +0700 Subject: [PATCH 07/13] refactor: remove extra mut --- packages/rs-dapi-client/src/address_list.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/rs-dapi-client/src/address_list.rs b/packages/rs-dapi-client/src/address_list.rs index 8b18af8ea5..5a92df63fe 100644 --- a/packages/rs-dapi-client/src/address_list.rs +++ b/packages/rs-dapi-client/src/address_list.rs @@ -139,7 +139,7 @@ impl AddressList { pub fn ban(&self, address: &Address) -> bool { let mut guard = self.addresses.write().unwrap(); - let Some(mut status) = guard.get_mut(address) else { + let Some(status) = guard.get_mut(address) else { return false; }; @@ -153,7 +153,7 @@ impl AddressList { pub fn unban(&self, address: &Address) -> bool { let mut guard = self.addresses.write().unwrap(); - let Some(mut status) = guard.get_mut(address) else { + let Some(status) = guard.get_mut(address) else { return false; }; @@ -206,7 +206,7 @@ impl AddressList { /// Randomly select a not banned address. pub fn get_live_address(&self) -> Option
{ - let mut guard = self.addresses.read().unwrap(); + let guard = self.addresses.read().unwrap(); let mut rng = SmallRng::from_entropy(); @@ -214,7 +214,7 @@ impl AddressList { guard .iter() - .filter(|(addr, status)| { + .filter(|(_, status)| { status .banned_until .map(|banned_until| banned_until < now) From 571265a33fdaf8c414b4d8af9e80dee0e37522b2 Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Thu, 28 Nov 2024 21:18:18 +0700 Subject: [PATCH 08/13] test: update evonode tests --- packages/rs-sdk/tests/fetch/evonode.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/rs-sdk/tests/fetch/evonode.rs b/packages/rs-sdk/tests/fetch/evonode.rs index 8f079c0f8d..6a6ce4a1f8 100644 --- a/packages/rs-sdk/tests/fetch/evonode.rs +++ b/packages/rs-sdk/tests/fetch/evonode.rs @@ -5,7 +5,6 @@ use dash_sdk::platform::{types::evonode::EvoNode, FetchUnproved}; use dpp::dashcore::{hashes::Hash, ProTxHash}; use drive_proof_verifier::types::EvoNodeStatus; use http::Uri; -use std::ops::Deref; use std::time::Duration; /// Given some existing evonode URIs, WHEN we connect to them, THEN we get status. use tokio::time::timeout; @@ -17,10 +16,7 @@ async fn test_evonode_status() { let cfg = Config::new(); let sdk = cfg.setup_api("test_evonode_status").await; - let addresses = cfg.address_list(); - - for address in addresses.iter() { - let address = address.deref(); + for (address, _status) in cfg.address_list() { let node = EvoNode::new(address.clone()); match timeout( Duration::from_secs(3), From 57c2871c85b6b49d2c273c7fcf499ca8b1c5e474 Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Thu, 28 Nov 2024 21:23:05 +0700 Subject: [PATCH 09/13] test: fix address list not implemented --- packages/rs-sdk/src/sdk.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index a89086d39e..bceb9dacf3 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -184,7 +184,7 @@ enum SdkInstance { dapi: Arc>, /// Mock SDK implementation processing mock expectations and responses. mock: Arc>, - + address_list: AddressList, /// Platform version configured for this Sdk version: &'static PlatformVersion, }, @@ -558,9 +558,7 @@ impl Sdk { match &self.inner { SdkInstance::Dapi { dapi, .. } => dapi.address_list(), #[cfg(feature = "mocks")] - SdkInstance::Mock { .. } => { - unimplemented!("mock Sdk does not have address list") - } + SdkInstance::Mock { address_list, .. } => address_list, } } } @@ -1020,10 +1018,11 @@ impl SdkBuilder { let sdk= Sdk { network: self.network, dapi_client_settings: self.settings, - inner:SdkInstance::Mock { - mock:mock_sdk.clone(), + inner: SdkInstance::Mock { + mock: mock_sdk.clone(), dapi, - version:self.version, + address_list: AddressList::new(), + version: self.version, }, dump_dir: self.dump_dir.clone(), proofs:self.proofs, From 16b8b0a89c49ee5b49a37a8af6ab583c395ac325 Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Fri, 29 Nov 2024 10:54:50 +0700 Subject: [PATCH 10/13] docs: update documentation --- packages/rs-sdk/src/sync.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/rs-sdk/src/sync.rs b/packages/rs-sdk/src/sync.rs index a42f3cc011..cf826c864a 100644 --- a/packages/rs-sdk/src/sync.rs +++ b/packages/rs-sdk/src/sync.rs @@ -112,6 +112,7 @@ async fn worker( /// /// ## Parameters /// +/// - `address_list` - list of addresses to be used for the requests. /// - `settings` - global settings with any request-specific settings overrides applied. /// - `future_factory_fn` - closure that returns a future that should be retried. It should take [`RequestSettings`] as /// an argument and return [`ExecutionResult`]. From a15f3cebbabb88642ef0db2717f8a6ba604bbea9 Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Tue, 3 Dec 2024 14:19:58 +0700 Subject: [PATCH 11/13] refactor: rename the ban function --- packages/rs-dapi-client/src/dapi_client.rs | 4 ++-- packages/rs-dapi-client/src/lib.rs | 2 +- packages/rs-drive-verify-c-binding/Cargo.toml | 2 +- packages/rs-sdk/src/sync.rs | 6 ++++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index e459ef6de4..9d0ae1b82f 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -103,7 +103,7 @@ impl DapiClient { /// Ban address in case of retryable error or unban it /// if it was banned, and the request was successful. -pub fn ban_failed_address( +pub fn update_address_ban_status( address_list: &AddressList, result: &ExecutionResult, applied_settings: &AppliedRequestSettings, @@ -278,7 +278,7 @@ impl DapiRequestExecutor for DapiClient { } }); - ban_failed_address::( + update_address_ban_status::( &self.address_list, &execution_result, &applied_settings, diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index c660993804..e820a714a0 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -18,7 +18,7 @@ pub use address_list::AddressList; pub use address_list::AddressListError; pub use address_list::AddressStatus; pub use connection_pool::ConnectionPool; -pub use dapi_client::{ban_failed_address, DapiClient, DapiClientError}; +pub use dapi_client::{update_address_ban_status, DapiClient, DapiClientError}; #[cfg(feature = "dump")] pub use dump::DumpData; pub use executor::{ diff --git a/packages/rs-drive-verify-c-binding/Cargo.toml b/packages/rs-drive-verify-c-binding/Cargo.toml index 1f6d9b4f1e..22da440ca7 100644 --- a/packages/rs-drive-verify-c-binding/Cargo.toml +++ b/packages/rs-drive-verify-c-binding/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rs-drive-verify-c-binding" -version = "1.1.0" +version = "1.6.2" edition = "2021" rust-version.workspace = true diff --git a/packages/rs-sdk/src/sync.rs b/packages/rs-sdk/src/sync.rs index cf826c864a..be7cf9d265 100644 --- a/packages/rs-sdk/src/sync.rs +++ b/packages/rs-sdk/src/sync.rs @@ -6,7 +6,9 @@ use arc_swap::ArcSwap; use drive_proof_verifier::error::ContextProviderError; -use rs_dapi_client::{ban_failed_address, AddressList, CanRetry, ExecutionResult, RequestSettings}; +use rs_dapi_client::{ + update_address_ban_status, AddressList, CanRetry, ExecutionResult, RequestSettings, +}; use std::fmt::Display; use std::{ fmt::Debug, @@ -195,7 +197,7 @@ where let result = (*func)(*settings).await; // Ban or unban the address based on the result - ban_failed_address(address_list, &result, &settings.finalize()); + update_address_ban_status(address_list, &result, &settings.finalize()); result } From d8fd37cf3887a9376869ef7d202f1abc6b09a81e Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Tue, 3 Dec 2024 15:49:49 +0100 Subject: [PATCH 12/13] test(sdk): update test_evonode_status test vectors --- ...eb28ff908ec0406a4ac48fff5727a9b9a4ee57.json} | Bin 155 -> 118 bytes ...3371cdde55f94897dcf9d905573b01fe445fbc9.json | Bin 12517 -> 0 bytes ...59dab90acb35dbe92b5072c67ae42b121a30dae.json | Bin 0 -> 113 bytes 3 files changed, 0 insertions(+), 0 deletions(-) rename packages/rs-sdk/tests/vectors/{test_evonode_status_refused/msg_EvoNode_6db392ff1869b56ecc7de9ace5864123671ed14d3f0c537aa8e878d24e529de5.json => test_evonode_status/msg_EvoNode_dae36baf8dec4f117f97a27099eb28ff908ec0406a4ac48fff5727a9b9a4ee57.json} (58%) delete mode 100644 packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_fbdf15806b1160a9fb482d5663371cdde55f94897dcf9d905573b01fe445fbc9.json create mode 100644 packages/rs-sdk/tests/vectors/test_evonode_status_refused/msg_EvoNode_7a8ca78c81edf0322718e172f59dab90acb35dbe92b5072c67ae42b121a30dae.json diff --git a/packages/rs-sdk/tests/vectors/test_evonode_status_refused/msg_EvoNode_6db392ff1869b56ecc7de9ace5864123671ed14d3f0c537aa8e878d24e529de5.json b/packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_dae36baf8dec4f117f97a27099eb28ff908ec0406a4ac48fff5727a9b9a4ee57.json similarity index 58% rename from packages/rs-sdk/tests/vectors/test_evonode_status_refused/msg_EvoNode_6db392ff1869b56ecc7de9ace5864123671ed14d3f0c537aa8e878d24e529de5.json rename to packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_dae36baf8dec4f117f97a27099eb28ff908ec0406a4ac48fff5727a9b9a4ee57.json index c80da24adbea4698e7b9af024c6761ca9cb3c143..c2bdd966125aa67f97c50972bdc211068c308734 100644 GIT binary patch delta 33 mcmbQuSSF>EQBqP+Y^ASnXk@Nu0EC8ChDOF_X8KAK?PCC$jR=SU delta 70 zcmXS$&1he(l$4kkpPXNsSE6KPpabINrKZFKxtTdiR(Yj4IXX(EMVU%gN*N_31y=g{ UhDPRk20& diff --git a/packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_fbdf15806b1160a9fb482d5663371cdde55f94897dcf9d905573b01fe445fbc9.json b/packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_fbdf15806b1160a9fb482d5663371cdde55f94897dcf9d905573b01fe445fbc9.json deleted file mode 100644 index e51843cf30447914599ac4af65de1f69fcf39d58..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12517 zcmcJVS#Bda5Jcy@kD>2^T~#XCE?r^{&}cMf1Ew*CdoZxS8h!VPmt;xQ%Agm(i)5#g znURq}vdAB+-+z1h`p5T|r{~qS?|Aq0*PmbY_S@rX`}Feo*sWgv`?lJy{(gS`cYAv~ z^f#Nkn{k|PhJMwDn6 zjHYh$fZgGT%8yR^|23=9_Ep0q2YWX;x?VvIclw^Y`Ch*iypk|NIr7){>GN`@#CM<9 zi)rqryKXb`Q?L08gDl9Ee#+O_^f@46P3XRlV-k*1X#fUU%C;my^`>)443R{X>PiN~ zxT)m`M?jw6aD)mB-8`qay+wv2v>KB-N5#@(`8q|rp&a`2LQ@L+P)qdC%}f>bopbH) z54?#ZS5QrzW?-9!tnjBXeFYuuu@DDF-tB{j-3(YG^%7bXI=5SHIdtOKXVoESTrs-% z@2exVWdb{yZPbJY)|s8+s9{V1W2c|4u`E5$ zGTW$OJbqvtkyv3!RK0+++j}cJv!zM@q1$*qWoFj5$t`)=e8?qUdmtofYbmjb8zl=$ zSS1#7G-;XyF4rTa?8fW4nz%+OrldJ3+%Rfhr=zE&f9d5;53J_)CVWs?yEFksvCmh6 zR8Ip)!jM~UUdu7tVaT(@z^NymMeSBq|6vWUM?(L%e=$>V*V_ z=^0~H#EjXig`!10Y8robsa09+u^7$z9(&ee@m62&>BD(poZX~-FPo}$%p3#DT*;J* zn6|*$aq@}S1~8pes|BskF26M~oo*=JI$Jf)EC^aPgPlr?6~6x4o)eQDorIRAnyFs) zoPwn%eVC~`%+?U+?fC+?irI%ZoI9Wqvy{yfw`^=kG41T(Kx)FlOAVp$EO8r>;((C! zhwPlxB*NY-PYf-a1H$k<+a)%eg7g)zsn|`^mI~p^h!FFcV=qfcO)-06m{njpM&vI( z0I>EYm*qcAE=(|Q)P0f{T}*M5r3z*m`2Qp`TY{ePV*k_k4%|h5bBrh3zGM7Q{YJsuD%Og3)VxyN`@@!2`sdv0N zl28?f#|Kjrap+^)va}Lh<}!sUcnA=!5c7?LW*?{cCnpG5_~Q^K9zL^2(jujNKA)4& zBZ6xGvAoeqOnx}eE+w3DNTZ2NPCsPV;k6Hwj`^HLsd|lrPmhfZxw{G&s)142}$meZ2U;cIQ#yAB~p|QSwwt#Y4*C;7i9< zypL@l|JoNRU$Z;JA|Gh!kEwn2@p{#d>dgL94t9lLt#N!Mtg8LQnYXNYyP9akc^uo< zbS}3)j3b-4gs9Pib7vmZR_l`Y#RQ!0#{mw*kL;R-V%++?d;>N%AjL&mV&aq9YZI2v z{!aXr#EU^?p)TUjyX5L}Bea&=cew?3ATeTbh)G_r9^zs?U;}F01jRe?q{MmOlt}K@ zA&y%K{63+!zhoK~cqg?)HO7W5`K0zuNhZK9q(cFthctfRcQ3h@lKD))_(mUc}IZI3l2L`ZI=yc)lNDA{T!E literal 0 HcmV?d00001 From 40faca90b40f7a6a00564855f38846768bc39520 Mon Sep 17 00:00:00 2001 From: ivanshumkov Date: Wed, 4 Dec 2024 09:40:08 +0700 Subject: [PATCH 13/13] perf: change atomic to relaxed --- packages/rs-dapi-client/src/dapi_client.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 9d0ae1b82f..ebca641f40 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -234,7 +234,7 @@ impl DapiRequestExecutor for DapiClient { // `impl Future`, not a `Result` itself. let address = address_result.map_err(|inner| ExecutionError { inner, - retries: retries_counter.load(std::sync::atomic::Ordering::Acquire), + retries: retries_counter.load(std::sync::atomic::Ordering::Relaxed), address: None, })?; @@ -247,7 +247,7 @@ impl DapiRequestExecutor for DapiClient { ) .map_err(|error| ExecutionError { inner: DapiClientError::Transport(error), - retries: retries_counter.load(std::sync::atomic::Ordering::Acquire), + retries: retries_counter.load(std::sync::atomic::Ordering::Relaxed), address: Some(address.clone()), })?; @@ -256,7 +256,7 @@ impl DapiRequestExecutor for DapiClient { .await .map_err(DapiClientError::Transport); - let retries = retries_counter.load(std::sync::atomic::Ordering::Acquire); + let retries = retries_counter.load(std::sync::atomic::Ordering::Relaxed); let execution_result = result .map(|inner| { @@ -294,7 +294,7 @@ impl DapiRequestExecutor for DapiClient { .retry(retry_settings) .notify(|error, duration| { let retries_counter = Arc::clone(&retries_counter_arc); - retries_counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel); + retries_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); tracing::warn!( ?error,