From bcd496fc459b6755e2180e6d77031a9896bd95c2 Mon Sep 17 00:00:00 2001 From: Michael Huang Date: Wed, 11 Oct 2023 13:45:19 -0400 Subject: [PATCH] Add ReplicationOptions to SessionConfig for excluding keyspaces from replica precompute We add ReplicationOptions which includes a vector of keyspace names to SessionConfig. When instantiating ClusterData, we exclude keyspaces from provided by ReplicationOptions when computing PrecomputedReplicas to conserve memory/compute resources. --- scylla/src/transport/cluster.rs | 13 ++++++++++-- .../src/transport/load_balancing/default.rs | 20 ++++++++++++++++-- scylla/src/transport/locator/mod.rs | 21 +++++++++++++++++++ scylla/src/transport/session.rs | 6 ++++++ 4 files changed, 56 insertions(+), 4 deletions(-) diff --git a/scylla/src/transport/cluster.rs b/scylla/src/transport/cluster.rs index 503d14519d..e3c9ffecde 100644 --- a/scylla/src/transport/cluster.rs +++ b/scylla/src/transport/cluster.rs @@ -28,7 +28,7 @@ use uuid::Uuid; use super::node::{KnownNode, NodeAddr}; -use super::locator::ReplicaLocator; +use super::locator::{ReplicaLocator, ReplicationOptions}; use super::partitioner::calculate_token_for_partition_key; use super::topology::Strategy; @@ -145,6 +145,7 @@ impl Cluster { fetch_schema_metadata: bool, host_filter: Option>, cluster_metadata_refresh_interval: Duration, + replication_opts: ReplicationOptions, ) -> Result { let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32); let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32); @@ -171,6 +172,7 @@ impl Cluster { &HashMap::new(), &None, host_filter.as_deref(), + Some(replication_opts), ) .await; cluster_data.wait_until_all_pools_are_initialized().await; @@ -274,6 +276,7 @@ impl ClusterData { known_peers: &HashMap>, used_keyspace: &Option, host_filter: Option<&dyn HostFilter>, + replication_opts: Option, ) -> Self { // Create new updated known_peers and ring let mut new_known_peers: HashMap> = @@ -281,6 +284,7 @@ impl ClusterData { let mut ring: Vec<(Token, Arc)> = Vec::new(); let mut datacenters: HashMap = HashMap::new(); let mut all_nodes: Vec> = Vec::with_capacity(metadata.peers.len()); + let replication_opts = replication_opts.unwrap_or_default(); for peer in metadata.peers { // Take existing Arc if possible, otherwise create new one @@ -340,7 +344,11 @@ impl ClusterData { let keyspaces = metadata.keyspaces; let (locator, keyspaces) = tokio::task::spawn_blocking(move || { - let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy); + let keyspaces_to_exclude = replication_opts.keyspaces_to_exclude(); + let keyspace_strategies = keyspaces + .iter() + .filter(|(k, _)| !keyspaces_to_exclude.contains(k)) + .map(|(_, ks)| &ks.strategy); let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies); (locator, keyspaces) }) @@ -662,6 +670,7 @@ impl ClusterWorker { &cluster_data.known_peers, &self.used_keyspace, self.host_filter.as_deref(), + None, ) .await, ); diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 3c2097c9dc..9365352fc4 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -1090,7 +1090,15 @@ mod tests { // based on locator mock cluster pub(crate) async fn mock_cluster_data_for_token_aware_tests() -> ClusterData { let metadata = mock_metadata_for_token_aware_tests(); - ClusterData::new(metadata, &Default::default(), &HashMap::new(), &None, None).await + ClusterData::new( + metadata, + &Default::default(), + &HashMap::new(), + &None, + None, + None, + ) + .await } // creates ClusterData with info about 5 nodes living in 2 different datacenters @@ -1114,7 +1122,15 @@ mod tests { keyspaces: HashMap::new(), }; - ClusterData::new(info, &Default::default(), &HashMap::new(), &None, None).await + ClusterData::new( + info, + &Default::default(), + &HashMap::new(), + &None, + None, + None, + ) + .await } pub(crate) fn get_plan_and_collect_node_identifiers( diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index db55b9fe69..1b39ba1eb6 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -21,6 +21,27 @@ use std::{ }; use tracing::debug; +#[derive(Debug, Default, Clone)] +pub struct ReplicationOptions { + keyspaces_to_exclude: Vec, +} + +impl ReplicationOptions { + pub(crate) fn new() -> Self { + Self { + keyspaces_to_exclude: Vec::new(), + } + } + + pub fn exclude_keyspace(&mut self, keyspace: String) { + self.keyspaces_to_exclude.push(keyspace) + } + + pub fn keyspaces_to_exclude(&self) -> &Vec { + &self.keyspaces_to_exclude + } +} + /// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication /// strategy) pair. It does so by either using the precomputed token ranges, or doing the /// computation on the fly. diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 35ff25475f..3a0a439f3f 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -60,6 +60,7 @@ use crate::transport::connection_pool::PoolConfig; use crate::transport::host_filter::HostFilter; use crate::transport::iterator::{PreparedIteratorConfig, RowIterator}; use crate::transport::load_balancing::{self, RoutingInfo}; +use crate::transport::locator::ReplicationOptions; use crate::transport::metrics::Metrics; use crate::transport::node::Node; use crate::transport::query_result::QueryResult; @@ -285,6 +286,9 @@ pub struct SessionConfig { /// for e.g: if they do not want unexpected traffic /// or they expect the topology to change frequently. pub cluster_metadata_refresh_interval: Duration, + + /// TODO(michael): Docs ... + pub replication_options: ReplicationOptions, } impl SessionConfig { @@ -331,6 +335,7 @@ impl SessionConfig { tracing_info_fetch_interval: Duration::from_millis(3), tracing_info_fetch_consistency: Consistency::One, cluster_metadata_refresh_interval: Duration::from_secs(60), + replication_options: ReplicationOptions::new(), } } @@ -524,6 +529,7 @@ impl Session { config.fetch_schema_metadata, config.host_filter, config.cluster_metadata_refresh_interval, + config.replication_options, ) .await?;