Skip to content

Commit

Permalink
fix: move object store read/write timer into inner (#3627)
Browse files Browse the repository at this point in the history
* fix: move object store read/write timer into inner

* add Drop for PrometheusMetricWrapper

* call await on async read/write

* apply review comments

* git rid of option on timer
  • Loading branch information
dimbtp authored Apr 3, 2024
1 parent ddeb73f commit 86d377d
Showing 1 changed file with 95 additions and 70 deletions.
165 changes: 95 additions & 70 deletions src/object-store/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use std::task::{Context, Poll};
use async_trait::async_trait;
use bytes::Bytes;
use common_telemetry::debug;
use futures::{FutureExt, TryFutureExt};
use futures::FutureExt;
use lazy_static::lazy_static;
use opendal::raw::*;
use opendal::ErrorKind;
use prometheus::{
exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec,
HistogramVec, IntCounterVec,
Histogram, HistogramTimer, HistogramVec, IntCounterVec,
};

type Result<T> = std::result::Result<T, opendal::Error>;
Expand Down Expand Up @@ -157,23 +157,27 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
.with_label_values(&[&self.scheme, Operation::Read.into_static()])
.start_timer();

let read_res = self
.inner
self.inner
.read(path, args)
.map(|v| {
v.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(r, Operation::Read, &self.scheme),
PrometheusMetricWrapper::new(
r,
Operation::Read,
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::Read.into_static()]),
timer,
),
)
})
})
.await;
timer.observe_duration();
read_res.map_err(|e| {
increment_errors_total(Operation::Read, e.kind());
e
})
.await
.map_err(|e| {
increment_errors_total(Operation::Read, e.kind());
e
})
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand All @@ -185,23 +189,27 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
.with_label_values(&[&self.scheme, Operation::Write.into_static()])
.start_timer();

let write_res = self
.inner
self.inner
.write(path, args)
.map(|v| {
v.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(r, Operation::Write, &self.scheme),
PrometheusMetricWrapper::new(
r,
Operation::Write,
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::Write.into_static()]),
timer,
),
)
})
})
.await;
timer.observe_duration();
write_res.map_err(|e| {
increment_errors_total(Operation::Write, e.kind());
e
})
.await
.map_err(|e| {
increment_errors_total(Operation::Write, e.kind());
e
})
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
Expand All @@ -212,13 +220,7 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
.with_label_values(&[&self.scheme, Operation::Stat.into_static()])
.start_timer();

let stat_res = self
.inner
.stat(path, args)
.inspect_err(|e| {
increment_errors_total(Operation::Stat, e.kind());
})
.await;
let stat_res = self.inner.stat(path, args).await;
timer.observe_duration();
stat_res.map_err(|e| {
increment_errors_total(Operation::Stat, e.kind());
Expand Down Expand Up @@ -321,17 +323,27 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()])
.start_timer();
let result = self.inner.blocking_read(path, args).map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(r, Operation::BlockingRead, &self.scheme),
)
});
timer.observe_duration();
result.map_err(|e| {
increment_errors_total(Operation::BlockingRead, e.kind());
e
})

self.inner
.blocking_read(path, args)
.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::BlockingRead,
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::BlockingRead.into_static(),
]),
timer,
),
)
})
.map_err(|e| {
increment_errors_total(Operation::BlockingRead, e.kind());
e
})
}

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
Expand All @@ -342,17 +354,27 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()])
.start_timer();
let result = self.inner.blocking_write(path, args).map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(r, Operation::BlockingWrite, &self.scheme),
)
});
timer.observe_duration();
result.map_err(|e| {
increment_errors_total(Operation::BlockingWrite, e.kind());
e
})

self.inner
.blocking_write(path, args)
.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::BlockingWrite,
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::BlockingWrite.into_static(),
]),
timer,
),
)
})
.map_err(|e| {
increment_errors_total(Operation::BlockingWrite, e.kind());
e
})
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
Expand Down Expand Up @@ -410,15 +432,30 @@ pub struct PrometheusMetricWrapper<R> {
inner: R,

op: Operation,
scheme: String,
bytes_counter: Histogram,
_requests_duration_timer: HistogramTimer,
bytes: u64,
}

impl<R> Drop for PrometheusMetricWrapper<R> {
fn drop(&mut self) {
self.bytes_counter.observe(self.bytes as f64);
}
}

impl<R> PrometheusMetricWrapper<R> {
fn new(inner: R, op: Operation, scheme: &String) -> Self {
fn new(
inner: R,
op: Operation,
bytes_counter: Histogram,
requests_duration_timer: HistogramTimer,
) -> Self {
Self {
inner,
op,
scheme: scheme.to_string(),
bytes_counter,
_requests_duration_timer: requests_duration_timer,
bytes: 0,
}
}
}
Expand All @@ -427,9 +464,7 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
self.inner.poll_read(cx, buf).map(|res| match res {
Ok(bytes) => {
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::Read.into_static()])
.observe(bytes as f64);
self.bytes += bytes as u64;
Ok(bytes)
}
Err(e) => {
Expand All @@ -452,9 +487,7 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.inner.poll_next(cx).map(|res| match res {
Some(Ok(bytes)) => {
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::Read.into_static()])
.observe(bytes.len() as f64);
self.bytes += bytes.len() as u64;
Some(Ok(bytes))
}
Some(Err(e)) => {
Expand All @@ -471,9 +504,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
self.inner
.read(buf)
.map(|n| {
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()])
.observe(n as f64);
self.bytes += n as u64;
n
})
.map_err(|e| {
Expand All @@ -492,9 +523,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn next(&mut self) -> Option<Result<Bytes>> {
self.inner.next().map(|res| match res {
Ok(bytes) => {
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()])
.observe(bytes.len() as f64);
self.bytes += bytes.len() as u64;
Ok(bytes)
}
Err(e) => {
Expand All @@ -511,9 +540,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
self.inner
.poll_write(cx, bs)
.map_ok(|n| {
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::Write.into_static()])
.observe(n as f64);
self.bytes += n as u64;
n
})
.map_err(|err| {
Expand Down Expand Up @@ -542,9 +569,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
self.inner
.write(bs)
.map(|n| {
BYTES_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()])
.observe(n as f64);
self.bytes += n as u64;
n
})
.map_err(|err| {
Expand Down

0 comments on commit 86d377d

Please sign in to comment.