-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ReplicationConfigs to SessionConfig for excluding keyspaces from replica precompute #831
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -22,13 +22,14 @@ use std::collections::HashMap; | |||||||||||||||||||||||||||
use std::net::SocketAddr; | ||||||||||||||||||||||||||||
use std::sync::Arc; | ||||||||||||||||||||||||||||
use std::time::Duration; | ||||||||||||||||||||||||||||
use std::vec; | ||||||||||||||||||||||||||||
use tracing::instrument::WithSubscriber; | ||||||||||||||||||||||||||||
use tracing::{debug, warn}; | ||||||||||||||||||||||||||||
use uuid::Uuid; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
use super::node::{KnownNode, NodeAddr}; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
use super::locator::ReplicaLocator; | ||||||||||||||||||||||||||||
use super::locator::{ReplicaLocator, ReplicationConfigs}; | ||||||||||||||||||||||||||||
use super::partitioner::calculate_token_for_partition_key; | ||||||||||||||||||||||||||||
use super::topology::Strategy; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
@@ -68,6 +69,7 @@ pub struct ClusterData { | |||||||||||||||||||||||||||
pub(crate) known_peers: HashMap<Uuid, Arc<Node>>, // Invariant: nonempty after Cluster::new() | ||||||||||||||||||||||||||||
pub(crate) keyspaces: HashMap<String, Keyspace>, | ||||||||||||||||||||||||||||
pub(crate) locator: ReplicaLocator, | ||||||||||||||||||||||||||||
pub(crate) replication_configs: ReplicationConfigs, | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/// Enables printing [ClusterData] struct in a neat way, skipping the clutter involved by | ||||||||||||||||||||||||||||
|
@@ -145,6 +147,7 @@ impl Cluster { | |||||||||||||||||||||||||||
fetch_schema_metadata: bool, | ||||||||||||||||||||||||||||
host_filter: Option<Arc<dyn HostFilter>>, | ||||||||||||||||||||||||||||
cluster_metadata_refresh_interval: Duration, | ||||||||||||||||||||||||||||
replication_opts: Option<&ReplicationConfigs>, | ||||||||||||||||||||||||||||
) -> Result<Cluster, NewSessionError> { | ||||||||||||||||||||||||||||
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32); | ||||||||||||||||||||||||||||
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32); | ||||||||||||||||||||||||||||
|
@@ -171,6 +174,7 @@ impl Cluster { | |||||||||||||||||||||||||||
&HashMap::new(), | ||||||||||||||||||||||||||||
&None, | ||||||||||||||||||||||||||||
host_filter.as_deref(), | ||||||||||||||||||||||||||||
replication_opts, | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
.await; | ||||||||||||||||||||||||||||
cluster_data.wait_until_all_pools_are_initialized().await; | ||||||||||||||||||||||||||||
|
@@ -274,6 +278,7 @@ impl ClusterData { | |||||||||||||||||||||||||||
known_peers: &HashMap<Uuid, Arc<Node>>, | ||||||||||||||||||||||||||||
used_keyspace: &Option<VerifiedKeyspaceName>, | ||||||||||||||||||||||||||||
host_filter: Option<&dyn HostFilter>, | ||||||||||||||||||||||||||||
replication_opts: Option<&ReplicationConfigs>, | ||||||||||||||||||||||||||||
) -> Self { | ||||||||||||||||||||||||||||
// Create new updated known_peers and ring | ||||||||||||||||||||||||||||
let mut new_known_peers: HashMap<Uuid, Arc<Node>> = | ||||||||||||||||||||||||||||
|
@@ -339,8 +344,16 @@ impl ClusterData { | |||||||||||||||||||||||||||
Self::update_rack_count(&mut datacenters); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let keyspaces = metadata.keyspaces; | ||||||||||||||||||||||||||||
// Exclude these keyspaces from replica set precomputation | ||||||||||||||||||||||||||||
let keyspaces_to_exclude = match replication_opts { | ||||||||||||||||||||||||||||
Some(opts) => opts.keyspaces_to_exclude().to_vec(), | ||||||||||||||||||||||||||||
None => vec![], | ||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||
Comment on lines
+347
to
+351
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better not to allocate here. Let's just use the borrowed
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even better, let's allow for both Allowlist and Denylist variant:
Suggested change
|
||||||||||||||||||||||||||||
let (locator, keyspaces) = tokio::task::spawn_blocking(move || { | ||||||||||||||||||||||||||||
let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy); | ||||||||||||||||||||||||||||
let keyspace_strategies = keyspaces | ||||||||||||||||||||||||||||
.iter() | ||||||||||||||||||||||||||||
.filter(|(k, _)| !keyspaces_to_exclude.contains(k)) | ||||||||||||||||||||||||||||
.map(|(_, ks)| &ks.strategy); | ||||||||||||||||||||||||||||
Comment on lines
+353
to
+356
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||
let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies); | ||||||||||||||||||||||||||||
(locator, keyspaces) | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
|
@@ -351,6 +364,10 @@ impl ClusterData { | |||||||||||||||||||||||||||
known_peers: new_known_peers, | ||||||||||||||||||||||||||||
keyspaces, | ||||||||||||||||||||||||||||
locator, | ||||||||||||||||||||||||||||
replication_configs: match replication_opts { | ||||||||||||||||||||||||||||
Some(opts) => opts.clone(), | ||||||||||||||||||||||||||||
None => ReplicationConfigs::default(), | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
@@ -662,6 +679,7 @@ impl ClusterWorker { | |||||||||||||||||||||||||||
&cluster_data.known_peers, | ||||||||||||||||||||||||||||
&self.used_keyspace, | ||||||||||||||||||||||||||||
self.host_filter.as_deref(), | ||||||||||||||||||||||||||||
Some(&cluster_data.replication_configs), | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
.await, | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,6 +21,23 @@ use std::{ | |||||
}; | ||||||
use tracing::debug; | ||||||
|
||||||
/// TODO(michael): Docs | ||||||
#[derive(Debug, Default, Clone)] | ||||||
pub struct ReplicationConfigs { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
// Keyspaces to exclude from precomuted replica sets | ||||||
keyspaces_to_exclude: Vec<String>, | ||||||
} | ||||||
|
||||||
impl ReplicationConfigs { | ||||||
pub fn exclude_keyspace(&mut self, keyspace: String) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
self.keyspaces_to_exclude.push(keyspace) | ||||||
} | ||||||
|
||||||
pub fn keyspaces_to_exclude(&self) -> &Vec<String> { | ||||||
&self.keyspaces_to_exclude | ||||||
} | ||||||
} | ||||||
|
||||||
Comment on lines
+36
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's create a enum ReplicasPrecomputationList {
Allowlist { allowed_keyspaces: Vec<String> },
Denylist { disallowed_keyspaces: Vec<String> },
} and store it |
||||||
/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication | ||||||
/// strategy) pair. It does so by either using the precomputed token ranges, or doing the | ||||||
/// computation on the fly. | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,7 @@ use crate::transport::connection_pool::PoolConfig; | |
use crate::transport::host_filter::HostFilter; | ||
use crate::transport::iterator::{PreparedIteratorConfig, RowIterator}; | ||
use crate::transport::load_balancing::{self, RoutingInfo}; | ||
use crate::transport::locator::ReplicationConfigs; | ||
use crate::transport::metrics::Metrics; | ||
use crate::transport::node::Node; | ||
use crate::transport::query_result::QueryResult; | ||
|
@@ -285,6 +286,9 @@ pub struct SessionConfig { | |
/// for e.g: if they do not want unexpected traffic | ||
/// or they expect the topology to change frequently. | ||
pub cluster_metadata_refresh_interval: Duration, | ||
|
||
/// TODO(michael): Docs ... | ||
pub replication_configs: ReplicationConfigs, | ||
Comment on lines
+289
to
+291
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
impl SessionConfig { | ||
|
@@ -331,6 +335,7 @@ impl SessionConfig { | |
tracing_info_fetch_interval: Duration::from_millis(3), | ||
tracing_info_fetch_consistency: Consistency::One, | ||
cluster_metadata_refresh_interval: Duration::from_secs(60), | ||
replication_configs: ReplicationConfigs::default(), | ||
} | ||
} | ||
|
||
|
@@ -524,6 +529,7 @@ impl Session { | |
config.fetch_schema_metadata, | ||
config.host_filter, | ||
config.cluster_metadata_refresh_interval, | ||
Some(&config.replication_configs), | ||
) | ||
.await?; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not make it an
Option
;ReplicationConfig
should be always passed to whereReplicaLocator
is constructed.