Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record object storage request latencies #5521

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,6 @@ pub fn new_counter(
counter
}

pub fn new_counter_with_labels(
guilload marked this conversation as resolved.
Show resolved Hide resolved
name: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
) -> IntCounter {
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let counter_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter");
prometheus::register(Box::new(counter.clone())).expect("failed to register counter");
counter
}

pub fn new_counter_vec<const N: usize>(
name: &str,
help: &str,
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Default for SearchMetrics {
),
root_search_request_duration_seconds: new_histogram_vec(
"root_search_request_duration_seconds",
"Duration of root search gRPC request in seconds.",
"Duration of root search gRPC requests in seconds.",
"search",
&[("kind", "server")],
["status"],
Expand All @@ -93,7 +93,7 @@ impl Default for SearchMetrics {
),
leaf_search_request_duration_seconds: new_histogram_vec(
"leaf_search_request_duration_seconds",
"Duration of leaf search gRPC request in seconds.",
"Duration of leaf search gRPC requests in seconds.",
"search",
&[("kind", "server")],
["status"],
Expand Down
49 changes: 42 additions & 7 deletions quickwit/quickwit-storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter, new_counter_vec, new_counter_with_labels, new_gauge, IntCounter, IntCounterVec,
IntGauge,
new_counter, new_counter_vec, new_gauge, new_histogram_vec, Histogram, IntCounter,
IntCounterVec, IntGauge,
};

/// Counters associated to storage operations.
Expand All @@ -41,6 +41,11 @@ pub struct StorageMetrics {
pub object_storage_put_parts: IntCounter,
pub object_storage_download_num_bytes: IntCounter,
pub object_storage_upload_num_bytes: IntCounter,

pub object_storage_delete_requests_total: IntCounter,
pub object_storage_bulk_delete_requests_total: IntCounter,
pub object_storage_delete_request_duration: Histogram,
pub object_storage_bulk_delete_request_duration: Histogram,
}

impl Default for StorageMetrics {
Expand All @@ -60,6 +65,32 @@ impl Default for StorageMetrics {
];
let get_slice_timeout_all_timeouts =
get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]);

let object_storage_requests_total = new_counter_vec(
"object_storage_requests_total",
"Total number of object storage requests performed.",
"storage",
&[],
["action"],
);
let object_storage_delete_requests_total =
object_storage_requests_total.with_label_values(["delete_object"]);
let object_storage_bulk_delete_requests_total =
object_storage_requests_total.with_label_values(["delete_objects"]);

let object_storage_request_duration = new_histogram_vec(
"object_storage_request_duration_seconds",
"Duration of object storage requests in seconds.",
"storage",
&[],
["action"],
vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0],
);
let object_storage_delete_request_duration =
object_storage_request_duration.with_label_values(["delete_object"]);
let object_storage_bulk_delete_request_duration =
object_storage_request_duration.with_label_values(["delete_objects"]);

StorageMetrics {
fast_field_cache: CacheMetrics::for_component("fastfields"),
fd_cache_metrics: CacheMetrics::for_component("fd"),
Expand Down Expand Up @@ -107,6 +138,10 @@ impl Default for StorageMetrics {
"storage",
&[],
),
object_storage_delete_requests_total,
object_storage_bulk_delete_requests_total,
object_storage_delete_request_duration,
object_storage_bulk_delete_request_duration,
}
}
}
Expand Down Expand Up @@ -141,31 +176,31 @@ impl CacheMetrics {
CACHE_METRICS_NAMESPACE,
&[("component_name", component_name)],
),
hits_num_items: new_counter_with_labels(
hits_num_items: new_counter(
"cache_hits_total",
"Number of cache hits by component",
CACHE_METRICS_NAMESPACE,
&[("component_name", component_name)],
),
hits_num_bytes: new_counter_with_labels(
hits_num_bytes: new_counter(
"cache_hits_bytes",
"Number of cache hits in bytes by component",
CACHE_METRICS_NAMESPACE,
&[("component_name", component_name)],
),
misses_num_items: new_counter_with_labels(
misses_num_items: new_counter(
"cache_misses_total",
"Number of cache misses by component",
CACHE_METRICS_NAMESPACE,
&[("component_name", component_name)],
),
evict_num_items: new_counter_with_labels(
evict_num_items: new_counter(
"cache_evict_total",
"Number of cache entry evicted by component",
CACHE_METRICS_NAMESPACE,
&[("component_name", component_name)],
),
evict_num_bytes: new_counter_with_labels(
evict_num_bytes: new_counter(
"cache_evict_bytes",
"Number of cache entry evicted in bytes by component",
CACHE_METRICS_NAMESPACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ impl S3CompatibleObjectStorage {
.byte_stream()
.await
.map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?;

crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
.object_storage_upload_num_bytes
.inc_by(len);

self.s3_client
.put_object()
.bucket(bucket)
Expand All @@ -304,11 +310,6 @@ impl S3CompatibleObjectStorage {
Retry::Permanent(StorageError::from(sdk_error))
}
})?;

crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
.object_storage_upload_num_bytes
.inc_by(len);
Ok(())
}

Expand Down Expand Up @@ -423,6 +424,7 @@ impl S3CompatibleObjectStorage {
.map_err(StorageError::from)
.map_err(Retry::Permanent)?;
let md5 = BASE64_STANDARD.encode(part.md5.0);

crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
.object_storage_upload_num_bytes
Expand All @@ -449,7 +451,7 @@ impl S3CompatibleObjectStorage {
})?;

let completed_part = CompletedPart::builder()
.set_e_tag(upload_part_output.e_tag().map(|tag| tag.to_string()))
.set_e_tag(upload_part_output.e_tag)
.part_number(part.part_number as i32)
.build();
Ok(completed_part)
Expand Down Expand Up @@ -538,13 +540,14 @@ impl S3CompatibleObjectStorage {
Ok(())
}

async fn create_get_object_request(
async fn get_object(
&self,
path: &Path,
range_opt: Option<Range<usize>>,
) -> Result<GetObjectOutput, SdkError<GetObjectError>> {
let key = self.key(path);
let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1));

crate::STORAGE_METRICS.object_storage_get_total.inc();

let get_object_output = self
Expand All @@ -565,7 +568,7 @@ impl S3CompatibleObjectStorage {
) -> StorageResult<Vec<u8>> {
let cap = range_opt.as_ref().map(Range::len).unwrap_or(0);
let get_object_output = aws_retry(&self.retry_params, || {
self.create_get_object_request(path, range_opt.clone())
self.get_object(path, range_opt.clone())
})
.await?;
let mut buf: Vec<u8> = Vec::with_capacity(cap);
Expand Down Expand Up @@ -638,6 +641,12 @@ impl S3CompatibleObjectStorage {
for (path_chunk, delete) in &mut delete_requests_it {
let delete_objects_res: StorageResult<DeleteObjectsOutput> =
aws_retry(&self.retry_params, || async {
crate::STORAGE_METRICS
.object_storage_bulk_delete_requests_total
.inc();
let _timer = crate::STORAGE_METRICS
.object_storage_bulk_delete_request_duration
.start_timer();
self.s3_client
.delete_objects()
.bucket(self.bucket.clone())
Expand Down Expand Up @@ -752,10 +761,8 @@ impl Storage for S3CompatibleObjectStorage {

async fn copy_to(&self, path: &Path, output: &mut dyn SendableAsync) -> StorageResult<()> {
let _permit = REQUEST_SEMAPHORE.acquire().await;
let get_object_output = aws_retry(&self.retry_params, || {
self.create_get_object_request(path, None)
})
.await?;
let get_object_output =
aws_retry(&self.retry_params, || self.get_object(path, None)).await?;
let mut body_read = BufReader::new(get_object_output.body.into_async_read());
let num_bytes_copied = tokio::io::copy_buf(&mut body_read, output).await?;
STORAGE_METRICS
Expand All @@ -770,6 +777,12 @@ impl Storage for S3CompatibleObjectStorage {
let bucket = self.bucket.clone();
let key = self.key(path);
let delete_res = aws_retry(&self.retry_params, || async {
crate::STORAGE_METRICS
.object_storage_delete_requests_total
.inc();
let _timer = crate::STORAGE_METRICS
.object_storage_delete_request_duration
.start_timer();
self.s3_client
.delete_object()
.bucket(&bucket)
Expand Down Expand Up @@ -818,7 +831,7 @@ impl Storage for S3CompatibleObjectStorage {
) -> crate::StorageResult<Box<dyn AsyncRead + Send + Unpin>> {
let permit = REQUEST_SEMAPHORE.acquire().await;
let get_object_output = aws_retry(&self.retry_params, || {
self.create_get_object_request(path, Some(range.clone()))
self.get_object(path, Some(range.clone()))
})
.await?;
Ok(Box::new(S3AsyncRead {
Expand Down
Loading