Skip to content

Commit

Permalink
Add ReplicationOptions to SessionConfig for excluding keyspaces from …
Browse files Browse the repository at this point in the history
…replica precompute

We add ReplicationOptions which includes a vector of keyspace names to SessionConfig.
When instantiating ClusterData, we exclude keyspaces from provided by ReplicationOptions when
computing PrecomputedReplicas to conserve memory/compute resources.
  • Loading branch information
michaelhly committed Oct 11, 2023
1 parent e6d6d3e commit bcd496f
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 4 deletions.
13 changes: 11 additions & 2 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use uuid::Uuid;

use super::node::{KnownNode, NodeAddr};

use super::locator::ReplicaLocator;
use super::locator::{ReplicaLocator, ReplicationOptions};
use super::partitioner::calculate_token_for_partition_key;
use super::topology::Strategy;

Expand Down Expand Up @@ -145,6 +145,7 @@ impl Cluster {
fetch_schema_metadata: bool,
host_filter: Option<Arc<dyn HostFilter>>,
cluster_metadata_refresh_interval: Duration,
replication_opts: ReplicationOptions,
) -> Result<Cluster, NewSessionError> {
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
Expand All @@ -171,6 +172,7 @@ impl Cluster {
&HashMap::new(),
&None,
host_filter.as_deref(),
Some(replication_opts),
)
.await;
cluster_data.wait_until_all_pools_are_initialized().await;
Expand Down Expand Up @@ -274,13 +276,15 @@ impl ClusterData {
known_peers: &HashMap<Uuid, Arc<Node>>,
used_keyspace: &Option<VerifiedKeyspaceName>,
host_filter: Option<&dyn HostFilter>,
replication_opts: Option<ReplicationOptions>,
) -> Self {
// Create new updated known_peers and ring
let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
HashMap::with_capacity(metadata.peers.len());
let mut ring: Vec<(Token, Arc<Node>)> = Vec::new();
let mut datacenters: HashMap<String, Datacenter> = HashMap::new();
let mut all_nodes: Vec<Arc<Node>> = Vec::with_capacity(metadata.peers.len());
let replication_opts = replication_opts.unwrap_or_default();

for peer in metadata.peers {
// Take existing Arc<Node> if possible, otherwise create new one
Expand Down Expand Up @@ -340,7 +344,11 @@ impl ClusterData {

let keyspaces = metadata.keyspaces;
let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
let keyspaces_to_exclude = replication_opts.keyspaces_to_exclude();
let keyspace_strategies = keyspaces
.iter()
.filter(|(k, _)| !keyspaces_to_exclude.contains(k))
.map(|(_, ks)| &ks.strategy);
let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies);
(locator, keyspaces)
})
Expand Down Expand Up @@ -662,6 +670,7 @@ impl ClusterWorker {
&cluster_data.known_peers,
&self.used_keyspace,
self.host_filter.as_deref(),
None,
)
.await,
);
Expand Down
20 changes: 18 additions & 2 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,15 @@ mod tests {
// based on locator mock cluster
pub(crate) async fn mock_cluster_data_for_token_aware_tests() -> ClusterData {
let metadata = mock_metadata_for_token_aware_tests();
ClusterData::new(metadata, &Default::default(), &HashMap::new(), &None, None).await
ClusterData::new(
metadata,
&Default::default(),
&HashMap::new(),
&None,
None,
None,
)
.await
}

// creates ClusterData with info about 5 nodes living in 2 different datacenters
Expand All @@ -1114,7 +1122,15 @@ mod tests {
keyspaces: HashMap::new(),
};

ClusterData::new(info, &Default::default(), &HashMap::new(), &None, None).await
ClusterData::new(
info,
&Default::default(),
&HashMap::new(),
&None,
None,
None,
)
.await
}

pub(crate) fn get_plan_and_collect_node_identifiers(
Expand Down
21 changes: 21 additions & 0 deletions scylla/src/transport/locator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ use std::{
};
use tracing::debug;

#[derive(Debug, Default, Clone)]
pub struct ReplicationOptions {
keyspaces_to_exclude: Vec<String>,
}

impl ReplicationOptions {
pub(crate) fn new() -> Self {
Self {
keyspaces_to_exclude: Vec::new(),
}
}

pub fn exclude_keyspace(&mut self, keyspace: String) {
self.keyspaces_to_exclude.push(keyspace)
}

pub fn keyspaces_to_exclude(&self) -> &Vec<String> {
&self.keyspaces_to_exclude
}
}

/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication
/// strategy) pair. It does so by either using the precomputed token ranges, or doing the
/// computation on the fly.
Expand Down
6 changes: 6 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::transport::connection_pool::PoolConfig;
use crate::transport::host_filter::HostFilter;
use crate::transport::iterator::{PreparedIteratorConfig, RowIterator};
use crate::transport::load_balancing::{self, RoutingInfo};
use crate::transport::locator::ReplicationOptions;
use crate::transport::metrics::Metrics;
use crate::transport::node::Node;
use crate::transport::query_result::QueryResult;
Expand Down Expand Up @@ -285,6 +286,9 @@ pub struct SessionConfig {
/// for e.g: if they do not want unexpected traffic
/// or they expect the topology to change frequently.
pub cluster_metadata_refresh_interval: Duration,

/// TODO(michael): Docs ...
pub replication_options: ReplicationOptions,
}

impl SessionConfig {
Expand Down Expand Up @@ -331,6 +335,7 @@ impl SessionConfig {
tracing_info_fetch_interval: Duration::from_millis(3),
tracing_info_fetch_consistency: Consistency::One,
cluster_metadata_refresh_interval: Duration::from_secs(60),
replication_options: ReplicationOptions::new(),
}
}

Expand Down Expand Up @@ -524,6 +529,7 @@ impl Session {
config.fetch_schema_metadata,
config.host_filter,
config.cluster_metadata_refresh_interval,
config.replication_options,
)
.await?;

Expand Down

0 comments on commit bcd496f

Please sign in to comment.