Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk)!: ban addresses failed in sdk #2351

Merged
merged 15 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/rs-dapi-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,7 @@ lru = { version = "0.12.3" }
serde = { version = "1.0.197", optional = true, features = ["derive"] }
serde_json = { version = "1.0.120", optional = true }
chrono = { version = "0.4.38", features = ["serde"] }
dashmap = "6.1.0"

[dev-dependencies]
tokio = { version = "1.40", features = ["macros"] }
44 changes: 16 additions & 28 deletions packages/rs-dapi-client/src/address_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
use chrono::Utc;
use dapi_grpc::tonic::codegen::http;
use dapi_grpc::tonic::transport::Uri;
use dashmap::setref::multiple::RefMulti;
use dashmap::DashSet;
use rand::{rngs::SmallRng, seq::IteratorRandom, SeedableRng};
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

const DEFAULT_BASE_BAN_PERIOD: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -103,7 +105,7 @@ pub enum AddressListError {
/// for [DapiRequest](crate::DapiRequest) execution.
#[derive(Debug, Clone)]
pub struct AddressList {
addresses: HashSet<Address>,
addresses: Arc<DashSet<Address>>,
base_ban_period: Duration,
}

Expand All @@ -128,14 +130,14 @@ impl AddressList {
/// Creates an empty [AddressList] with adjustable base ban time.
pub fn with_settings(base_ban_period: Duration) -> Self {
AddressList {
addresses: HashSet::new(),
addresses: Arc::new(DashSet::new()),
base_ban_period,
}
}

/// Bans address
pub(crate) fn ban_address(&mut self, address: &Address) -> Result<(), AddressListError> {
if !self.addresses.remove(address) {
pub fn ban_address(&self, address: &Address) -> Result<(), AddressListError> {
if self.addresses.remove(address).is_none() {
return Err(AddressListError::AddressNotFound(address.uri.clone()));
};

Expand All @@ -148,8 +150,8 @@ impl AddressList {
}

/// Clears address' ban record
pub(crate) fn unban_address(&mut self, address: &Address) -> Result<(), AddressListError> {
if !self.addresses.remove(address) {
pub fn unban_address(&self, address: &Address) -> Result<(), AddressListError> {
if self.addresses.remove(address).is_none() {
return Err(AddressListError::AddressNotFound(address.uri.clone()));
};

Expand Down Expand Up @@ -177,14 +179,9 @@ impl AddressList {
}

/// Randomly select a not banned address.
pub fn get_live_address(&self) -> Option<&Address> {
pub fn get_live_address(&self) -> Option<RefMulti<Address>> {
let mut rng = SmallRng::from_entropy();

self.unbanned().into_iter().choose(&mut rng)
}

/// Get all addresses that are not banned.
fn unbanned(&self) -> Vec<&Address> {
let now = chrono::Utc::now();

self.addresses
Expand All @@ -194,12 +191,7 @@ impl AddressList {
.map(|banned_until| banned_until < now)
.unwrap_or(true)
})
.collect()
}

/// Get number of available, not banned addresses.
pub fn available(&self) -> usize {
self.unbanned().len()
.choose(&mut rng)
}

/// Get number of all addresses, both banned and not banned.
Expand All @@ -214,6 +206,11 @@ impl AddressList {
pub fn is_empty(&self) -> bool {
self.addresses.is_empty()
}

/// Get an iterator over all addresses.
pub fn iter(&self) -> impl Iterator<Item = RefMulti<Address>> {
self.addresses.iter()
}
}

// TODO: Must be changed to FromStr
Expand All @@ -238,12 +235,3 @@ impl FromIterator<Uri> for AddressList {
address_list
}
}

impl IntoIterator for AddressList {
type Item = Address;
type IntoIter = std::collections::hash_set::IntoIter<Address>;

fn into_iter(self) -> Self::IntoIter {
self.addresses.into_iter()
}
}
86 changes: 47 additions & 39 deletions packages/rs-dapi-client/src/dapi_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use dapi_grpc::mock::Mockable;
use dapi_grpc::tonic::async_trait;
use std::fmt::Debug;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;
use tracing::Instrument;

Expand Down Expand Up @@ -72,7 +72,7 @@ impl Mockable for DapiClientError {
/// Access point to DAPI.
#[derive(Debug, Clone)]
pub struct DapiClient {
address_list: Arc<RwLock<AddressList>>,
address_list: AddressList,
settings: RequestSettings,
pool: ConnectionPool,
#[cfg(feature = "dump")]
Expand All @@ -86,7 +86,7 @@ impl DapiClient {
let address_count = 3 * address_list.len();

Self {
address_list: Arc::new(RwLock::new(address_list)),
address_list,
settings,
pool: ConnectionPool::new(address_count),
#[cfg(feature = "dump")]
Expand All @@ -95,7 +95,7 @@ impl DapiClient {
}

/// Return the [DapiClient] address list.
pub fn address_list(&self) -> &Arc<RwLock<AddressList>> {
pub fn address_list(&self) -> &AddressList {
&self.address_list
}
}
Expand Down Expand Up @@ -140,18 +140,13 @@ impl DapiRequestExecutor for DapiClient {
let retries_counter = Arc::clone(retries_counter_arc_ref);

// Try to get an address to initialize transport on:
let address_list = self
let address_result = self
.address_list
.read()
.expect("can't get address list for read");

let address_result = address_list
.get_live_address()
.as_deref()
.cloned()
.ok_or(DapiClientError::NoAvailableAddresses);

drop(address_list);

let _span = tracing::trace_span!(
"execute request",
address = ?address_result,
Expand Down Expand Up @@ -203,43 +198,56 @@ impl DapiRequestExecutor for DapiClient {
Ok(_) => {
// Unban the address if it was banned and node responded successfully this time
if address.is_banned() {
let mut address_list = self
.address_list
.write()
.expect("can't get address list for write");

address_list.unban_address(&address).map_err(|error| {
ExecutionError {
inner: DapiClientError::AddressList(error),
retries: retries_counter
.load(std::sync::atomic::Ordering::Acquire),
address: Some(address.clone()),
match self.address_list.unban_address(&address) {
Ok(_) => {
tracing::debug!(
?address,
"unban successfully responded address {}",
address
);
}
// The address might be already removed from the list
// by background process (i.e., SML update), and it's fine.
Err(AddressListError::AddressNotFound(_)) => {
tracing::debug!(
?address,
"unable to unban address {address}. it's not in the list"
);
}
})?;
Err(AddressListError::InvalidAddressUri(_)) => {
unreachable!("unban address doesn't return InvalidAddressUri")
}
}
lklimek marked this conversation as resolved.
Show resolved Hide resolved
}

tracing::trace!(?response, "received {} response", response_name);
}
Err(error) => {
if error.can_retry() {
if applied_settings.ban_failed_address {
let mut address_list = self
.address_list
.write()
.expect("can't get address list for write");
tracing::warn!(
?address,
?error,
"received server error, banning address"
);
address_list.ban_address(&address).map_err(|error| {
ExecutionError {
inner: DapiClientError::AddressList(error),
retries: retries_counter
.load(std::sync::atomic::Ordering::Acquire),
address: Some(address.clone()),
match self.address_list.ban_address(&address) {
Ok(_) => {
tracing::warn!(
?address,
?error,
"received server error, banning address {address}"
);
}
// The address might be already removed from the list
// by background process (i.e., SML update), and it's fine.
Err(AddressListError::AddressNotFound(_)) => {
tracing::debug!(
?address,
?error,
"unable to unban address {address}. it's not in the list"
);
}
})?;
Err(AddressListError::InvalidAddressUri(_)) => {
unreachable!(
"unban address doesn't return InvalidAddressUri"
)
}
}
shumkov marked this conversation as resolved.
Show resolved Hide resolved
} else {
tracing::debug!(
?address,
Expand Down
1 change: 0 additions & 1 deletion packages/rs-drive-proof-verifier/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use dpp::ProtocolError;
use drive::grovedb::operations::proof::GroveDBProof;

/// Errors
#[derive(Debug, thiserror::Error)]
Expand Down
2 changes: 1 addition & 1 deletion packages/rs-sdk/src/platform/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ where
.dapi_client_settings
.override_by(settings.unwrap_or_default());

retry(settings, fut).await.into_inner()
retry(sdk.address_list(), settings, fut).await.into_inner()
}

/// Fetch single object from Platform.
Expand Down
4 changes: 2 additions & 2 deletions packages/rs-sdk/src/platform/fetch_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ where
.dapi_client_settings
.override_by(settings.unwrap_or_default());

retry(settings, fut).await.into_inner()
retry(sdk.address_list(), settings, fut).await.into_inner()
}

/// Fetch multiple objects from Platform by their identifiers.
Expand Down Expand Up @@ -327,7 +327,7 @@ impl FetchMany<Identifier, Documents> for Document {
) -> Result<Documents, Error> {
let document_query: &DocumentQuery = &query.query(sdk.prove())?;

retry(sdk.dapi_client_settings, |settings| async move {
retry(sdk.address_list(), sdk.dapi_client_settings, |settings| async move {
let request = document_query.clone();

let ExecutionResponse {
Expand Down
5 changes: 3 additions & 2 deletions packages/rs-sdk/src/platform/fetch_unproved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ where
/// - `settings`: Request settings for the connection to Platform.
///
/// ## Returns
/// Returns:
/// * `Ok(Some(Self))` when object is found.
/// * `Ok(None)` when object is not found.
/// * [`Err(Error)`](Error) when an error occurs.
Expand Down Expand Up @@ -107,7 +106,9 @@ where
};

let settings = sdk.dapi_client_settings.override_by(settings);
retry(settings, closure).await.into_inner()
retry(sdk.address_list(), settings, closure)
.await
.into_inner()
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/rs-sdk/src/platform/transition/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl BroadcastStateTransition for StateTransition {
};

// response is empty for a broadcast, result comes from the stream wait for state transition result
retry(retry_settings, factory)
retry(sdk.address_list(), retry_settings, factory)
.await
.into_inner()
.map(|_| ())
Expand Down Expand Up @@ -122,7 +122,7 @@ impl BroadcastStateTransition for StateTransition {
.wrap_to_execution_result(&response)
};

let future = retry(retry_settings, factory);
let future = retry(sdk.address_list(), retry_settings, factory);
// run the future with or without timeout, depending on the settings
let wait_timeout = settings.and_then(|s| s.wait_timeout);
match wait_timeout {
Expand Down
10 changes: 2 additions & 8 deletions packages/rs-sdk/src/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,15 +554,9 @@ impl Sdk {
}

/// Return the [DapiClient] address list
pub fn address_list(&self) -> Result<AddressList, String> {
pub fn address_list(&self) -> &AddressList {
match &self.inner {
SdkInstance::Dapi { dapi, version: _ } => {
let address_list_arc = dapi.address_list();
let address_list_lock = address_list_arc
.read()
.map_err(|e| format!("Failed to read address list: {e}"))?;
Ok(address_list_lock.clone())
}
SdkInstance::Dapi { dapi, .. } => dapi.address_list(),
#[cfg(feature = "mocks")]
SdkInstance::Mock { .. } => {
unimplemented!("mock Sdk does not have address list")
Expand Down
Loading
Loading