diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index ef687882c16..57b24f74c3e 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -90,25 +90,6 @@ pub fn new_counter( counter } -pub fn new_counter_with_labels( - name: &str, - help: &str, - subsystem: &str, - const_labels: &[(&str, &str)], -) -> IntCounter { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); - let counter_opts = Opts::new(name, help) - .namespace("quickwit") - .subsystem(subsystem) - .const_labels(owned_const_labels); - let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter"); - prometheus::register(Box::new(counter.clone())).expect("failed to register counter"); - counter -} - pub fn new_counter_vec( name: &str, help: &str, diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 287c8711377..35b7d3115c5 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -70,7 +70,7 @@ impl Default for SearchMetrics { ), root_search_request_duration_seconds: new_histogram_vec( "root_search_request_duration_seconds", - "Duration of root search gRPC request in seconds.", + "Duration of root search gRPC requests in seconds.", "search", &[("kind", "server")], ["status"], @@ -93,7 +93,7 @@ impl Default for SearchMetrics { ), leaf_search_request_duration_seconds: new_histogram_vec( "leaf_search_request_duration_seconds", - "Duration of leaf search gRPC request in seconds.", + "Duration of leaf search gRPC requests in seconds.", "search", &[("kind", "server")], ["status"], diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 77287de891b..0fecc17a32a 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -21,8 +21,8 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter, new_counter_vec, new_counter_with_labels, new_gauge, IntCounter, IntCounterVec, - IntGauge, + exponential_buckets, new_counter, new_counter_vec, new_gauge, new_histogram_vec, Histogram, + IntCounter, IntCounterVec, IntGauge, }; /// Counters associated to storage operations. @@ -41,6 +41,11 @@ pub struct StorageMetrics { pub object_storage_put_parts: IntCounter, pub object_storage_download_num_bytes: IntCounter, pub object_storage_upload_num_bytes: IntCounter, + pub object_storage_get_requests_duration: Histogram, + pub object_storage_put_requests_duration: Histogram, + pub object_storage_upload_part_requests_duration: Histogram, + pub object_storage_delete_requests_duration: Histogram, + pub object_storage_batch_delete_requests_duration: Histogram, } impl Default for StorageMetrics { @@ -60,6 +65,26 @@ impl Default for StorageMetrics { ]; let get_slice_timeout_all_timeouts = get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]); + + let object_storage_requests_duration = new_histogram_vec( + "object_storage_request_duration_seconds", + "Duration of object storage requests in seconds.", + "storage", + &[], + ["action"], + exponential_buckets(0.001, 2.0, 15).unwrap(), + ); + let object_storage_get_requests_duration = + object_storage_requests_duration.with_label_values(["get_object"]); + let object_storage_put_requests_duration = + object_storage_requests_duration.with_label_values(["put_object"]); + let object_storage_upload_part_requests_duration = + object_storage_requests_duration.with_label_values(["upload_part"]); + let object_storage_delete_requests_duration = + object_storage_requests_duration.with_label_values(["delete_object"]); + let object_storage_batch_delete_requests_duration = + object_storage_requests_duration.with_label_values(["delete_objects"]); + StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), fd_cache_metrics: CacheMetrics::for_component("fd"), @@ -107,6 +132,11 @@ impl Default for StorageMetrics { "storage", &[], ), + object_storage_get_requests_duration, + object_storage_put_requests_duration, + object_storage_upload_part_requests_duration, + object_storage_delete_requests_duration, + object_storage_batch_delete_requests_duration, } } } @@ -139,19 +169,19 @@ impl CacheMetrics { CACHE_METRICS_NAMESPACE, &[("component_name", component_name)], ), - hits_num_items: new_counter_with_labels( + hits_num_items: new_counter( "cache_hits_total", "Number of cache hits by component", CACHE_METRICS_NAMESPACE, &[("component_name", component_name)], ), - hits_num_bytes: new_counter_with_labels( + hits_num_bytes: new_counter( "cache_hits_bytes", "Number of cache hits in bytes by component", CACHE_METRICS_NAMESPACE, &[("component_name", component_name)], ), - misses_num_items: new_counter_with_labels( + misses_num_items: new_counter( "cache_misses_total", "Number of cache misses by component", CACHE_METRICS_NAMESPACE, diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 86ef692c671..007ca103773 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -289,6 +289,15 @@ impl S3CompatibleObjectStorage { .byte_stream() .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; + + crate::STORAGE_METRICS.object_storage_put_parts.inc(); + crate::STORAGE_METRICS + .object_storage_upload_num_bytes + .inc_by(len); + let _timer = crate::STORAGE_METRICS + .object_storage_put_requests_duration + .start_timer(); + self.s3_client .put_object() .bucket(bucket) @@ -304,11 +313,6 @@ impl S3CompatibleObjectStorage { Retry::Permanent(StorageError::from(sdk_error)) } })?; - - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(len); Ok(()) } @@ -423,10 +427,14 @@ impl S3CompatibleObjectStorage { .map_err(StorageError::from) .map_err(Retry::Permanent)?; let md5 = BASE64_STANDARD.encode(part.md5.0); + crate::STORAGE_METRICS.object_storage_put_parts.inc(); crate::STORAGE_METRICS .object_storage_upload_num_bytes .inc_by(part.len()); + let _timer = crate::STORAGE_METRICS + .object_storage_upload_part_requests_duration + .start_timer(); let upload_part_output = self .s3_client @@ -538,14 +546,18 @@ impl S3CompatibleObjectStorage { Ok(()) } - async fn create_get_object_request( + async fn get_object( &self, path: &Path, range_opt: Option>, ) -> Result> { let key = self.key(path); let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1)); + crate::STORAGE_METRICS.object_storage_get_total.inc(); + let _timer = crate::STORAGE_METRICS + .object_storage_get_requests_duration + .start_timer(); let get_object_output = self .s3_client @@ -565,7 +577,7 @@ impl S3CompatibleObjectStorage { ) -> StorageResult> { let cap = range_opt.as_ref().map(Range::len).unwrap_or(0); let get_object_output = aws_retry(&self.retry_params, || { - self.create_get_object_request(path, range_opt.clone()) + self.get_object_request(path, range_opt) }) .await?; let mut buf: Vec = Vec::with_capacity(cap); @@ -638,6 +650,12 @@ impl S3CompatibleObjectStorage { for (path_chunk, delete) in &mut delete_requests_it { let delete_objects_res: StorageResult = aws_retry(&self.retry_params, || async { + crate::STORAGE_METRICS + .object_storage_batch_delete_total + .inc(); + let _timer = crate::STORAGE_METRICS + .object_storage_batch_delete_requests_duration + .start_timer(); self.s3_client .delete_objects() .bucket(self.bucket.clone()) @@ -752,10 +770,8 @@ impl Storage for S3CompatibleObjectStorage { async fn copy_to(&self, path: &Path, output: &mut dyn SendableAsync) -> StorageResult<()> { let _permit = REQUEST_SEMAPHORE.acquire().await; - let get_object_output = aws_retry(&self.retry_params, || { - self.create_get_object_request(path, None) - }) - .await?; + let get_object_output = + aws_retry(&self.retry_params, || self.get_object_request(path, None)).await?; let mut body_read = BufReader::new(get_object_output.body.into_async_read()); let num_bytes_copied = tokio::io::copy_buf(&mut body_read, output).await?; STORAGE_METRICS @@ -770,6 +786,10 @@ impl Storage for S3CompatibleObjectStorage { let bucket = self.bucket.clone(); let key = self.key(path); let delete_res = aws_retry(&self.retry_params, || async { + crate::STORAGE_METRICS.object_storage_delete_total.inc(); + let _timer = crate::STORAGE_METRICS + .object_storage_delete_requests_duration + .start_timer(); self.s3_client .delete_object() .bucket(&bucket) @@ -818,7 +838,7 @@ impl Storage for S3CompatibleObjectStorage { ) -> crate::StorageResult> { let permit = REQUEST_SEMAPHORE.acquire().await; let get_object_output = aws_retry(&self.retry_params, || { - self.create_get_object_request(path, Some(range.clone())) + self.get_object_request(path, Some(range.clone())) }) .await?; Ok(Box::new(S3AsyncRead {