Skip to content

Commit

Permalink
metrics: make metrics optional under crate feature
Browse files Browse the repository at this point in the history
Add metrics crate feature which enables usage
and gathering of metrics.

Therefore everyone willing to use metrics in their code
is required to add metrics feature in their Cargo.toml
file or compile otherwise with --features metrics flag.
  • Loading branch information
QuerthDP authored and NikodemGapski committed Jan 14, 2025
1 parent d73456f commit db9d17b
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 4 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ scylla = { path = "../scylla", features = [
"num-bigint-03",
"num-bigint-04",
"bigdecimal-04",
"metrics",
] }
tokio = { version = "1.34", features = ["full"] }
tracing = { version = "0.1.25", features = ["log"] }
Expand Down
3 changes: 2 additions & 1 deletion scylla/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ full-serialization = [
"num-bigint-04",
"bigdecimal-04",
]
metrics = ["dep:histogram"]

[dependencies]
scylla-macros = { version = "0.7.0", path = "../scylla-macros" }
Expand All @@ -47,7 +48,7 @@ byteorder = "1.3.4"
bytes = "1.0.1"
futures = "0.3.6"
hashbrown = "0.14"
histogram = "0.11.1"
histogram = { version = "0.11.1", optional = true }
tokio = { version = "1.34", features = [
"net",
"time",
Expand Down
13 changes: 12 additions & 1 deletion scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::frame::response::result;
use crate::network::Connection;
use crate::observability::driver_tracing::RequestSpan;
use crate::observability::history::{self, HistoryListener};
#[cfg(feature = "metrics")]
use crate::observability::metrics::Metrics;
use crate::policies::load_balancing::{self, RoutingInfo};
use crate::policies::retry::{QueryInfo, RetryDecision, RetrySession};
Expand Down Expand Up @@ -68,6 +69,7 @@ pub(crate) struct PreparedIteratorConfig {
pub(crate) values: SerializedValues,
pub(crate) execution_profile: Arc<ExecutionProfileInner>,
pub(crate) cluster_data: Arc<ClusterState>,
#[cfg(feature = "metrics")]
pub(crate) metrics: Arc<Metrics>,
}

Expand Down Expand Up @@ -143,6 +145,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
query_consistency: Consistency,
retry_session: Box<dyn RetrySession>,
execution_profile: Arc<ExecutionProfileInner>,
#[cfg(feature = "metrics")]
metrics: Arc<Metrics>,

paging_state: PagingState,
Expand Down Expand Up @@ -237,11 +240,13 @@ where
self.log_attempt_error(&last_error, &retry_decision);
match retry_decision {
RetryDecision::RetrySameNode(cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = cl.unwrap_or(current_consistency);
continue 'same_node_retries;
}
RetryDecision::RetryNextNode(cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = cl.unwrap_or(current_consistency);
continue 'nodes_in_plan;
Expand Down Expand Up @@ -299,6 +304,7 @@ where
node: NodeRef<'_>,
request_span: &RequestSpan,
) -> Result<ControlFlow<PageSendAttemptedProof, ()>, QueryError> {
#[cfg(feature = "metrics")]
self.metrics.inc_total_paged_queries();
let query_start = std::time::Instant::now();

Expand All @@ -324,6 +330,7 @@ where
tracing_id,
..
}) => {
#[cfg(feature = "metrics")]
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
self.log_attempt_success();
self.log_query_success();
Expand Down Expand Up @@ -360,6 +367,7 @@ where
}
Err(err) => {
let err = err.into();
#[cfg(feature = "metrics")]
self.metrics.inc_failed_paged_queries();
self.execution_profile
.load_balancing_policy
Expand All @@ -379,6 +387,7 @@ where
Ok(ControlFlow::Break(proof))
}
Ok(response) => {
#[cfg(feature = "metrics")]
self.metrics.inc_failed_paged_queries();
let err =
ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into();
Expand Down Expand Up @@ -682,7 +691,7 @@ impl QueryPager {
query: Query,
execution_profile: Arc<ExecutionProfileInner>,
cluster_data: Arc<ClusterState>,
metrics: Arc<Metrics>,
#[cfg(feature = "metrics")] metrics: Arc<Metrics>,
) -> Result<Self, QueryError> {
let (sender, receiver) = mpsc::channel(1);

Expand Down Expand Up @@ -745,6 +754,7 @@ impl QueryPager {
query_consistency: consistency,
retry_session,
execution_profile,
#[cfg(feature = "metrics")]
metrics,
paging_state: PagingState::start(),
history_listener: query.config.history_listener.clone(),
Expand Down Expand Up @@ -863,6 +873,7 @@ impl QueryPager {
query_consistency: consistency,
retry_session,
execution_profile: config.execution_profile,
#[cfg(feature = "metrics")]
metrics: config.metrics,
paging_state: PagingState::start(),
history_listener: config.prepared.config.history_listener.clone(),
Expand Down
35 changes: 35 additions & 0 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::network::SslConfig;
use crate::network::{Connection, ConnectionConfig, PoolConfig, VerifiedKeyspaceName};
use crate::observability::driver_tracing::RequestSpan;
use crate::observability::history::{self, HistoryListener};
#[cfg(feature = "metrics")]
use crate::observability::metrics::Metrics;
use crate::observability::tracing::TracingInfo;
use crate::policies::address_translator::AddressTranslator;
Expand Down Expand Up @@ -100,6 +101,7 @@ where
cluster: Cluster,
default_execution_profile_handle: ExecutionProfileHandle,
schema_agreement_interval: Duration,
#[cfg(feature = "metrics")]
metrics: Arc<Metrics>,
schema_agreement_timeout: Duration,
schema_agreement_automatic_waiting: bool,
Expand All @@ -125,6 +127,7 @@ impl<DeserApi> std::fmt::Debug for GenericSession<DeserApi>
where
DeserApi: DeserializationApiKind,
{
#[cfg(feature = "metrics")]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Session")
.field("cluster", &ClusterNeatDebug(&self.cluster))
Expand All @@ -140,6 +143,22 @@ where
)
.finish()
}

#[cfg(not(feature = "metrics"))]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Session")
.field("cluster", &ClusterNeatDebug(&self.cluster))
.field(
"default_execution_profile_handle",
&self.default_execution_profile_handle,
)
.field("schema_agreement_interval", &self.schema_agreement_interval)
.field(
"auto_await_schema_agreement_timeout",
&self.schema_agreement_timeout,
)
.finish()
}
}

/// Configuration options for [`Session`].
Expand Down Expand Up @@ -790,6 +809,7 @@ impl GenericSession<CurrentDeserializationApi> {
LegacySession {
cluster: self.cluster.clone(),
default_execution_profile_handle: self.default_execution_profile_handle.clone(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
refresh_metadata_on_auto_schema_agreement: self
.refresh_metadata_on_auto_schema_agreement,
Expand Down Expand Up @@ -900,6 +920,7 @@ impl GenericSession<LegacyDeserializationApi> {
Session {
cluster: self.cluster.clone(),
default_execution_profile_handle: self.default_execution_profile_handle.clone(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
refresh_metadata_on_auto_schema_agreement: self
.refresh_metadata_on_auto_schema_agreement,
Expand Down Expand Up @@ -997,13 +1018,15 @@ where
identity: config.identity,
};

#[cfg(feature = "metrics")]
let metrics = Arc::new(Metrics::new());

let pool_config = PoolConfig {
connection_config,
pool_size: config.connection_pool_size,
can_use_shard_aware_port: !config.disallow_shard_aware_port,
keepalive_interval: config.keepalive_interval,
#[cfg(feature = "metrics")]
metrics: Some(metrics.clone()),
};

Expand All @@ -1024,6 +1047,7 @@ where
cluster,
default_execution_profile_handle,
schema_agreement_interval: config.schema_agreement_interval,
#[cfg(feature = "metrics")]
metrics,
schema_agreement_timeout: config.schema_agreement_timeout,
schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
Expand Down Expand Up @@ -1234,6 +1258,7 @@ where
query,
execution_profile,
self.cluster.get_data(),
#[cfg(feature = "metrics")]
self.metrics.clone(),
)
.await
Expand All @@ -1248,6 +1273,7 @@ where
values,
execution_profile,
cluster_data: self.cluster.get_data(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
})
.await
Expand Down Expand Up @@ -1502,6 +1528,7 @@ where
values: serialized_values,
execution_profile,
cluster_data: self.cluster.get_data(),
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
})
.await
Expand Down Expand Up @@ -1713,6 +1740,7 @@ where
/// Access metrics collected by the driver\
/// Driver collects various metrics like number of queries or query latencies.
/// They can be read using this method
#[cfg(feature = "metrics")]
pub fn get_metrics(&self) -> Arc<Metrics> {
self.metrics.clone()
}
Expand Down Expand Up @@ -1934,6 +1962,7 @@ where
};

let context = speculative_execution::Context {
#[cfg(feature = "metrics")]
metrics: self.metrics.clone(),
};

Expand Down Expand Up @@ -1979,6 +2008,7 @@ where
Some(timeout) => tokio::time::timeout(timeout, runner)
.await
.unwrap_or_else(|e| {
#[cfg(feature = "metrics")]
self.metrics.inc_request_timeouts();
Err(QueryError::RequestTimeout(format!(
"Request took longer than {}ms: {}",
Expand Down Expand Up @@ -2039,6 +2069,7 @@ where
};
context.request_span.record_shard_id(&connection);

#[cfg(feature = "metrics")]
self.metrics.inc_total_nonpaged_queries();
let request_start = std::time::Instant::now();

Expand All @@ -2058,6 +2089,7 @@ where
last_error = match request_result {
Ok(response) => {
trace!(parent: &span, "Request succeeded");
#[cfg(feature = "metrics")]
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
context.log_attempt_success(&attempt_id);
execution_profile.load_balancing_policy.on_query_success(
Expand All @@ -2073,6 +2105,7 @@ where
last_error = %e,
"Request failed"
);
#[cfg(feature = "metrics")]
self.metrics.inc_failed_nonpaged_queries();
execution_profile.load_balancing_policy.on_query_failure(
context.query_info,
Expand Down Expand Up @@ -2102,11 +2135,13 @@ where
context.log_attempt_error(&attempt_id, the_error, &retry_decision);
match retry_decision {
RetryDecision::RetrySameNode(new_cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = new_cl.unwrap_or(current_consistency);
continue 'same_node_retries;
}
RetryDecision::RetryNextNode(new_cl) => {
#[cfg(feature = "metrics")]
self.metrics.inc_retries_num();
current_consistency = new_cl.unwrap_or(current_consistency);
continue 'nodes_in_plan;
Expand Down
1 change: 1 addition & 0 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ impl MetadataReader {
// so explicitly disable it here
can_use_shard_aware_port: false,

#[cfg(feature = "metrics")]
// TODO: This should probably be changed or removed depending on
// how we want to pass the metrics to the PoolRefiller
metrics: None,
Expand Down
4 changes: 3 additions & 1 deletion scylla/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::frame::{
server_event_type::EventType,
FrameParams, SerializedRequest,
};
#[cfg(feature = "metrics")]
use crate::observability::metrics::Metrics;
use crate::policies::address_translator::AddressTranslator;
use crate::query::Query;
Expand Down Expand Up @@ -1898,14 +1899,15 @@ pub(super) async fn open_connection_to_shard_aware_port(
shard: Shard,
sharder: Sharder,
connection_config: &ConnectionConfig,
metrics: Option<Arc<Metrics>>,
#[cfg(feature = "metrics")] metrics: Option<Arc<Metrics>>,
) -> Result<(Connection, ErrorReceiver), ConnectionError> {
// Create iterator over all possible source ports for this shard
let source_port_iter = sharder.iter_source_ports_for_shard(shard);

for port in source_port_iter {
let connect_result = open_connection(endpoint.clone(), Some(port), connection_config).await;

#[cfg(feature = "metrics")]
if let Some(metrics) = &metrics {
if connect_result.is_ok() {
metrics.inc_total_connections();
Expand Down
8 changes: 8 additions & 0 deletions scylla/src/network/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::routing::{Shard, ShardCount, Sharder};

use crate::cluster::metadata::{PeerEndpoint, UntranslatedEndpoint};

#[cfg(feature = "metrics")]
use crate::observability::metrics::Metrics;

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -62,6 +63,7 @@ pub(crate) struct PoolConfig {
pub(crate) pool_size: PoolSize,
pub(crate) can_use_shard_aware_port: bool,
pub(crate) keepalive_interval: Option<Duration>,
#[cfg(feature = "metrics")]
// TODO: The metrics should definitely not be stored here,
// but it was the easiest way to pass it to the refiller.
// It could be refactored to be passed as a parameter to the refiller,
Expand All @@ -76,6 +78,7 @@ impl Default for PoolConfig {
pool_size: Default::default(),
can_use_shard_aware_port: true,
keepalive_interval: None,
#[cfg(feature = "metrics")]
metrics: None,
}
}
Expand Down Expand Up @@ -932,6 +935,7 @@ impl PoolRefiller {
// As this may may involve resolving a hostname, the whole operation is async.
let endpoint_fut = self.maybe_translate_for_serverless(endpoint);

#[cfg(feature = "metrics")]
let metrics = self.pool_config.metrics.clone();

let fut = match (self.sharder.clone(), self.shard_aware_port, shard) {
Expand All @@ -946,6 +950,7 @@ impl PoolRefiller {
shard,
sharder.clone(),
&cfg,
#[cfg(feature = "metrics")]
metrics,
)
.await;
Expand All @@ -960,6 +965,7 @@ impl PoolRefiller {
let non_shard_aware_endpoint = endpoint_fut.await;
let result = open_connection(non_shard_aware_endpoint, None, &cfg).await;

#[cfg(feature = "metrics")]
if let Some(metrics) = metrics {
if result.is_ok() {
metrics.inc_total_connections();
Expand Down Expand Up @@ -1044,6 +1050,7 @@ impl PoolRefiller {
match maybe_idx {
Some(idx) => {
v.swap_remove(idx);
#[cfg(feature = "metrics")]
self.pool_config
.metrics
.as_ref()
Expand Down Expand Up @@ -1269,6 +1276,7 @@ mod tests {
0,
sharder.clone(),
&connection_config,
#[cfg(feature = "metrics")]
None,
));
}
Expand Down
2 changes: 2 additions & 0 deletions scylla/src/observability/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
pub(crate) mod driver_tracing;
pub mod history;
#[cfg(feature = "metrics")]
pub mod lock_free_histogram;
#[cfg(feature = "metrics")]
pub mod metrics;
pub mod tracing;
Loading

0 comments on commit db9d17b

Please sign in to comment.