Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make metrics collection optional/faster #1147

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,11 +997,14 @@ where
identity: config.identity,
};

let metrics = Arc::new(Metrics::new());

let pool_config = PoolConfig {
connection_config,
pool_size: config.connection_pool_size,
can_use_shard_aware_port: !config.disallow_shard_aware_port,
keepalive_interval: config.keepalive_interval,
metrics: Some(metrics.clone()),
};

let cluster = Cluster::new(
Expand All @@ -1021,7 +1024,7 @@ where
cluster,
default_execution_profile_handle,
schema_agreement_interval: config.schema_agreement_interval,
metrics: Arc::new(Metrics::new()),
metrics,
schema_agreement_timeout: config.schema_agreement_timeout,
schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
refresh_metadata_on_auto_schema_agreement: config
Expand Down Expand Up @@ -1978,6 +1981,7 @@ where
Some(timeout) => tokio::time::timeout(timeout, runner)
.await
.unwrap_or_else(|e| {
self.metrics.inc_request_timeouts();
Err(QueryError::RequestTimeout(format!(
"Request took longer than {}ms: {}",
timeout.as_millis(),
Expand Down
4 changes: 4 additions & 0 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,10 @@ impl MetadataReader {
// The shard-aware port won't be used with PerHost pool size anyway,
// so explicitly disable it here
can_use_shard_aware_port: false,

// TODO: This should probably be changed or removed depending on
// how we want to pass the metrics to the PoolRefiller
metrics: None,
};

NodeConnectionPool::new(endpoint, pool_config, None, refresh_requester)
Expand Down
10 changes: 10 additions & 0 deletions scylla/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::frame::{
server_event_type::EventType,
FrameParams, SerializedRequest,
};
use crate::observability::metrics::Metrics;
use crate::policies::address_translator::AddressTranslator;
use crate::query::Query;
use crate::response::query_result::QueryResult;
Expand Down Expand Up @@ -1903,13 +1904,22 @@ pub(super) async fn open_connection_to_shard_aware_port(
shard: Shard,
sharder: Sharder,
connection_config: &ConnectionConfig,
metrics: Option<Arc<Metrics>>,
) -> Result<(Connection, ErrorReceiver), ConnectionError> {
// Create iterator over all possible source ports for this shard
let source_port_iter = sharder.iter_source_ports_for_shard(shard);

for port in source_port_iter {
let connect_result = open_connection(endpoint.clone(), Some(port), connection_config).await;

if let Some(metrics) = &metrics {
if connect_result.is_ok() {
metrics.inc_total_connections();
} else if let Err(ConnectionError::ConnectTimeout) = &connect_result {
metrics.inc_connection_timeouts();
}
}

match connect_result {
Err(err) if err.is_address_unavailable_for_use() => continue, // If we can't use this port, try the next one
result => return result,
Expand Down
26 changes: 26 additions & 0 deletions scylla/src/network/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::routing::{Shard, ShardCount, Sharder};

use crate::cluster::metadata::{PeerEndpoint, UntranslatedEndpoint};

use crate::observability::metrics::Metrics;

#[cfg(feature = "cloud")]
use crate::cluster::node::resolve_hostname;

Expand Down Expand Up @@ -61,6 +63,11 @@ pub(crate) struct PoolConfig {
pub(crate) pool_size: PoolSize,
pub(crate) can_use_shard_aware_port: bool,
pub(crate) keepalive_interval: Option<Duration>,
// TODO: The metrics should definitely not be stored here,
// but it was the easiest way to pass it to the refiller.
// It could be refactored to be passed as a parameter to the refiller,
// but it would require a lot of changes in the code.
pub(crate) metrics: Option<Arc<Metrics>>,
}

impl Default for PoolConfig {
Expand All @@ -70,6 +77,7 @@ impl Default for PoolConfig {
pool_size: Default::default(),
can_use_shard_aware_port: true,
keepalive_interval: None,
metrics: None,
}
}
}
Expand Down Expand Up @@ -922,6 +930,8 @@ impl PoolRefiller {
// As this may may involve resolving a hostname, the whole operation is async.
let endpoint_fut = self.maybe_translate_for_serverless(endpoint);

let metrics = self.pool_config.metrics.clone();

let fut = match (self.sharder.clone(), self.shard_aware_port, shard) {
(Some(sharder), Some(port), Some(shard)) => async move {
let shard_aware_endpoint = {
Expand All @@ -934,6 +944,7 @@ impl PoolRefiller {
shard,
sharder.clone(),
&cfg,
metrics,
)
.await;
OpenedConnectionEvent {
Expand All @@ -946,6 +957,15 @@ impl PoolRefiller {
_ => async move {
let non_shard_aware_endpoint = endpoint_fut.await;
let result = open_connection(non_shard_aware_endpoint, None, &cfg).await;

if let Some(metrics) = metrics {
if result.is_ok() {
metrics.inc_total_connections();
} else if let Err(ConnectionError::ConnectTimeout) = &result {
metrics.inc_connection_timeouts();
}
}

OpenedConnectionEvent {
result,
requested_shard: None,
Expand Down Expand Up @@ -1022,6 +1042,11 @@ impl PoolRefiller {
match maybe_idx {
Some(idx) => {
v.swap_remove(idx);
self.pool_config
.metrics
.as_ref()
.unwrap()
.dec_total_connections();
true
}
None => false,
Expand Down Expand Up @@ -1242,6 +1267,7 @@ mod tests {
0,
sharder.clone(),
&connection_config,
None,
));
}

Expand Down
41 changes: 41 additions & 0 deletions scylla/src/observability/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ pub struct Metrics {
retries_num: AtomicU64,
histogram: Arc<LockFreeHistogram>,
meter: Arc<Meter>,
total_connections: AtomicU64,
connection_timeouts: AtomicU64,
request_timeouts: AtomicU64,
}

impl Metrics {
Expand All @@ -192,6 +195,9 @@ impl Metrics {
retries_num: AtomicU64::new(0),
histogram: Arc::new(LockFreeHistogram::default()),
meter: Arc::new(Meter::new()),
total_connections: AtomicU64::new(0),
connection_timeouts: AtomicU64::new(0),
request_timeouts: AtomicU64::new(0),
}
}

Expand Down Expand Up @@ -223,6 +229,26 @@ impl Metrics {
self.retries_num.fetch_add(1, ORDER_TYPE);
}

/// Increments counter for total number of connections
pub(crate) fn inc_total_connections(&self) {
self.total_connections.fetch_add(1, ORDER_TYPE);
}

/// Decrements counter for total number of connections
pub(crate) fn dec_total_connections(&self) {
self.total_connections.fetch_sub(1, ORDER_TYPE);
}
Comment on lines +232 to +240
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ These docstrings pretty much repeat what's said in the functions' names, but they don't tell me when I should call those.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those functions are not meant to be called by the user.

Nevertheless I surely will adjust the docstrings to better describe the logic behind those functions for devs to understand and use properly if needed in other places in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for devs to understand and use properly if needed in other places in the future.

That's what I had in mind - internal usage.


/// Increments counter for connection timeouts
pub(crate) fn inc_connection_timeouts(&self) {
self.connection_timeouts.fetch_add(1, ORDER_TYPE);
}

/// Increments counter for request timeouts
pub(crate) fn inc_request_timeouts(&self) {
self.request_timeouts.fetch_add(1, ORDER_TYPE);
}
Comment on lines +247 to +250
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Is this counter intented to measure client-side request timeouts, server-side request timeouts, or both?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK this metric was used in cpp-driver to count client timeouts on requests to the database.
So is this method called in our implementation (alongside returning QueryError::RequestTimeout, the docstring of which mentions that it regards the client side).

I think it should be specified in the docstring of our inc method as well, so I will adjust it.


/// Saves to histogram latency of completing single query.
/// For paged queries it should log latency for every page.
///
Expand Down Expand Up @@ -324,6 +350,21 @@ impl Metrics {
self.meter.fifteen_minute_rate()
}

/// Returns total number of active connections
pub fn get_total_connections(&self) -> u64 {
self.total_connections.load(ORDER_TYPE)
}

/// Returns counter for connection timeouts
pub fn get_connection_timeouts(&self) -> u64 {
self.connection_timeouts.load(ORDER_TYPE)
}

/// Returns counter for request timeouts
pub fn get_request_timeouts(&self) -> u64 {
self.request_timeouts.load(ORDER_TYPE)
}

// Metric implementations

fn mean(h: Histogram) -> Result<u64, MetricsError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why they decided to remove Histogram::mean() method

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The histogram has no way of providing a true mean. Do we use the lower or upper end of the bucket range while calculating? Somewhere in the middle? It seems more appropriate to let the caller decide how they want to deal with this detail. Same when determining a percentile, the best we can do is return the Bucket where the percentile falls into its range. It may depend on your use-case on what value to report. Previous assumptions of over-reporting latencies by using the upper-edge of the bucket might not be appropriate for all use-cases.

Expand Down