Skip to content

Commit

Permalink
Add ReplicationOptions to SessionConfig for excluding keyspaces from …
Browse files Browse the repository at this point in the history
…replica precompute

We add ReplicationOptions which includes a vector of keyspace names to SessionConfig.
When instantiating ClusterData, we exclude keyspaces from provided by ReplicationOptions when
computing PrecomputedReplicas to conserve memory/compute resources.
  • Loading branch information
michaelhly committed Oct 11, 2023
1 parent e6d6d3e commit 9f61a0a
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 6 deletions.
23 changes: 20 additions & 3 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/// Cluster manages up to date information and connections to database nodes
// Cluster manages up to date information and connections to database nodes
use crate::frame::response::event::{Event, StatusChangeEvent};
use crate::frame::value::ValueList;
use crate::prepared_statement::TokenCalculationError;
Expand All @@ -22,13 +22,14 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::vec;
use tracing::instrument::WithSubscriber;
use tracing::{debug, warn};
use uuid::Uuid;

use super::node::{KnownNode, NodeAddr};

use super::locator::ReplicaLocator;
use super::locator::{ReplicaLocator, ReplicationConfigs};
use super::partitioner::calculate_token_for_partition_key;
use super::topology::Strategy;

Expand Down Expand Up @@ -68,6 +69,7 @@ pub struct ClusterData {
pub(crate) known_peers: HashMap<Uuid, Arc<Node>>, // Invariant: nonempty after Cluster::new()
pub(crate) keyspaces: HashMap<String, Keyspace>,
pub(crate) locator: ReplicaLocator,
pub(crate) replication_configs: ReplicationConfigs,
}

/// Enables printing [ClusterData] struct in a neat way, skipping the clutter involved by
Expand Down Expand Up @@ -145,6 +147,7 @@ impl Cluster {
fetch_schema_metadata: bool,
host_filter: Option<Arc<dyn HostFilter>>,
cluster_metadata_refresh_interval: Duration,
replication_opts: &ReplicationConfigs,
) -> Result<Cluster, NewSessionError> {
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
Expand All @@ -171,6 +174,7 @@ impl Cluster {
&HashMap::new(),
&None,
host_filter.as_deref(),
Some(replication_opts),
)
.await;
cluster_data.wait_until_all_pools_are_initialized().await;
Expand Down Expand Up @@ -274,6 +278,7 @@ impl ClusterData {
known_peers: &HashMap<Uuid, Arc<Node>>,
used_keyspace: &Option<VerifiedKeyspaceName>,
host_filter: Option<&dyn HostFilter>,
replication_opts: Option<&ReplicationConfigs>,
) -> Self {
// Create new updated known_peers and ring
let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
Expand Down Expand Up @@ -339,8 +344,15 @@ impl ClusterData {
Self::update_rack_count(&mut datacenters);

let keyspaces = metadata.keyspaces;
let keyspaces_to_exclude = match replication_opts {
Some(opts) => opts.keyspaces_to_exclude().to_vec(),
None => vec![],
};
let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
let keyspace_strategies = keyspaces
.iter()
.filter(|(k, _)| !keyspaces_to_exclude.contains(k))
.map(|(_, ks)| &ks.strategy);
let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies);
(locator, keyspaces)
})
Expand All @@ -351,6 +363,10 @@ impl ClusterData {
known_peers: new_known_peers,
keyspaces,
locator,
replication_configs: match replication_opts {
Some(opts) => opts.clone(),
None => ReplicationConfigs::default(),
},
}
}

Expand Down Expand Up @@ -662,6 +678,7 @@ impl ClusterWorker {
&cluster_data.known_peers,
&self.used_keyspace,
self.host_filter.as_deref(),
Some(&cluster_data.replication_configs),
)
.await,
);
Expand Down
20 changes: 18 additions & 2 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,15 @@ mod tests {
// based on locator mock cluster
pub(crate) async fn mock_cluster_data_for_token_aware_tests() -> ClusterData {
let metadata = mock_metadata_for_token_aware_tests();
ClusterData::new(metadata, &Default::default(), &HashMap::new(), &None, None).await
ClusterData::new(
metadata,
&Default::default(),
&HashMap::new(),
&None,
None,
None,
)
.await
}

// creates ClusterData with info about 5 nodes living in 2 different datacenters
Expand All @@ -1114,7 +1122,15 @@ mod tests {
keyspaces: HashMap::new(),
};

ClusterData::new(info, &Default::default(), &HashMap::new(), &None, None).await
ClusterData::new(
info,
&Default::default(),
&HashMap::new(),
&None,
None,
None,
)
.await
}

pub(crate) fn get_plan_and_collect_node_identifiers(
Expand Down
6 changes: 5 additions & 1 deletion scylla/src/transport/load_balancing/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ mod tests {
use std::{net::SocketAddr, str::FromStr, sync::Arc};

use crate::transport::{
locator::test::{create_locator, mock_metadata_for_token_aware_tests},
locator::{
test::{create_locator, mock_metadata_for_token_aware_tests},
ReplicationConfigs,
},
Node, NodeAddr,
};

Expand Down Expand Up @@ -156,6 +159,7 @@ mod tests {
known_peers: Default::default(),
keyspaces: Default::default(),
locator,
replication_configs: ReplicationConfigs::default(),
};
let routing_info = RoutingInfo::default();
let plan = Plan::new(&policy, &routing_info, &cluster_data);
Expand Down
17 changes: 17 additions & 0 deletions scylla/src/transport/locator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,26 @@ use std::{
cmp,
collections::{HashMap, HashSet},
sync::Arc,
vec,
};
use tracing::debug;

/// TODO(michael): Docs
#[derive(Debug, Default, Clone)]
pub struct ReplicationConfigs {
keyspaces_to_exclude: Vec<String>,
}

impl ReplicationConfigs {
pub fn exclude_keyspace(&mut self, keyspace: String) {
self.keyspaces_to_exclude.push(keyspace)
}

pub fn keyspaces_to_exclude(&self) -> &Vec<String> {
&self.keyspaces_to_exclude
}
}

/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication
/// strategy) pair. It does so by either using the precomputed token ranges, or doing the
/// computation on the fly.
Expand Down
6 changes: 6 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::transport::connection_pool::PoolConfig;
use crate::transport::host_filter::HostFilter;
use crate::transport::iterator::{PreparedIteratorConfig, RowIterator};
use crate::transport::load_balancing::{self, RoutingInfo};
use crate::transport::locator::ReplicationConfigs;
use crate::transport::metrics::Metrics;
use crate::transport::node::Node;
use crate::transport::query_result::QueryResult;
Expand Down Expand Up @@ -285,6 +286,9 @@ pub struct SessionConfig {
/// for e.g: if they do not want unexpected traffic
/// or they expect the topology to change frequently.
pub cluster_metadata_refresh_interval: Duration,

/// TODO(michael): Docs ...
pub replication_options: ReplicationConfigs,
}

impl SessionConfig {
Expand Down Expand Up @@ -331,6 +335,7 @@ impl SessionConfig {
tracing_info_fetch_interval: Duration::from_millis(3),
tracing_info_fetch_consistency: Consistency::One,
cluster_metadata_refresh_interval: Duration::from_secs(60),
replication_options: ReplicationConfigs::default(),
}
}

Expand Down Expand Up @@ -524,6 +529,7 @@ impl Session {
config.fetch_schema_metadata,
config.host_filter,
config.cluster_metadata_refresh_interval,
&config.replication_options,
)
.await?;

Expand Down

0 comments on commit 9f61a0a

Please sign in to comment.