From dc8ce28f5267f5ab71ce9f8886c290f544ee3598 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 14 Aug 2024 06:10:01 +0000 Subject: [PATCH 1/3] chore: bump opendal version to 0.49 --- Cargo.lock | 4 +- src/common/datasource/src/object_store/fs.rs | 11 +- src/common/datasource/src/object_store/s3.rs | 11 +- src/object-store/Cargo.toml | 2 +- src/object-store/src/util.rs | 109 +++++++++++++++++-- 5 files changed, 111 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15e9d65c9fcd..ac4bcd2e8c4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7010,9 +7010,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.48.0" +version = "0.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "615d41187deea0ea7fab5b48e9afef6ae8fc742fdcfa248846ee3d92ff71e986" +checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494" dependencies = [ "anyhow", "async-trait", diff --git a/src/common/datasource/src/object_store/fs.rs b/src/common/datasource/src/object_store/fs.rs index f21fe46099d5..f87311f517b7 100644 --- a/src/common/datasource/src/object_store/fs.rs +++ b/src/common/datasource/src/object_store/fs.rs @@ -13,6 +13,7 @@ // limitations under the License. use object_store::services::Fs; +use object_store::util::DefaultLoggingInterceptor; use object_store::ObjectStore; use snafu::ResultExt; @@ -22,13 +23,9 @@ pub fn build_fs_backend(root: &str) -> Result { let builder = Fs::default(); let object_store = ObjectStore::new(builder.root(root)) .context(BuildBackendSnafu)? - .layer( - object_store::layers::LoggingLayer::default() - // Print the expected error only in DEBUG level. - // See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level - .with_error_level(Some("debug")) - .expect("input error level must be valid"), - ) + .layer(object_store::layers::LoggingLayer::new( + DefaultLoggingInterceptor, + )) .layer(object_store::layers::TracingLayer) .layer(object_store::layers::PrometheusMetricsLayer::new(true)) .finish(); diff --git a/src/common/datasource/src/object_store/s3.rs b/src/common/datasource/src/object_store/s3.rs index cdba93767745..e141621b899b 100644 --- a/src/common/datasource/src/object_store/s3.rs +++ b/src/common/datasource/src/object_store/s3.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use object_store::services::S3; +use object_store::util::DefaultLoggingInterceptor; use object_store::ObjectStore; use snafu::ResultExt; @@ -84,13 +85,9 @@ pub fn build_s3_backend( // TODO(weny): Consider finding a better way to eliminate duplicate code. Ok(ObjectStore::new(builder) .context(error::BuildBackendSnafu)? - .layer( - object_store::layers::LoggingLayer::default() - // Print the expected error only in DEBUG level. - // See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level - .with_error_level(Some("debug")) - .expect("input error level must be valid"), - ) + .layer(object_store::layers::LoggingLayer::new( + DefaultLoggingInterceptor, + )) .layer(object_store::layers::TracingLayer) .layer(object_store::layers::PrometheusMetricsLayer::new(true)) .finish()) diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index ca2a3a7ab32f..53c65d7d20a7 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -17,7 +17,7 @@ futures.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } -opendal = { version = "0.48", features = [ +opendal = { version = "0.49", features = [ "layers-tracing", "services-azblob", "services-fs", diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index c8f7ac893fa4..24ae3cea3fe1 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_telemetry::{debug, error, trace}; use futures::TryStreamExt; -use opendal::layers::{LoggingLayer, TracingLayer}; -use opendal::{Entry, Lister}; +use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer}; +use opendal::raw::{AccessorInfo, Operation}; +use opendal::{Entry, ErrorKind, Lister}; use crate::layers::PrometheusMetricsLayer; use crate::ObjectStore; @@ -140,17 +142,106 @@ pub(crate) fn extract_parent_path(path: &str) -> &str { /// Attaches instrument layers to the object store. pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore { object_store - .layer( - LoggingLayer::default() - // Print the expected error only in DEBUG level. - // See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level - .with_error_level(Some("debug")) - .expect("input error level must be valid"), - ) + .layer(LoggingLayer::new(DefaultLoggingInterceptor)) .layer(TracingLayer) .layer(PrometheusMetricsLayer::new(path_label)) } +static LOGGING_TARGET: &str = "opendal::services"; + +#[derive(Debug, Copy, Clone, Default)] +pub struct DefaultLoggingInterceptor; + +impl LoggingInterceptor for DefaultLoggingInterceptor { + #[inline] + fn log( + &self, + info: &AccessorInfo, + operation: Operation, + context: &[(&str, &str)], + message: &str, + err: Option<&opendal::Error>, + ) { + if let Some(err) = err { + // Print error if it's unexpected, otherwise in debug. + if err.kind() == ErrorKind::Unexpected { + error!( + target: LOGGING_TARGET, + "service={} name={} {}: {operation} {message} {}", + info.scheme(), + info.name(), + format_args!( + "{}", + context.iter().enumerate().map(|(i, (k, v))| { + if i > 0 { + format!(" {}={}", k, v) + } else { + format!("{}={}", k, v) + } + }).collect::() + ), + format!("{err:?}") + ); + } else { + debug!( + target: LOGGING_TARGET, + "service={} name={} {}: {operation} {message} {}", + info.scheme(), + info.name(), + format_args!( + "{}", + context.iter().enumerate().map(|(i, (k, v))| { + if i > 0 { + format!(" {}={}", k, v) + } else { + format!("{}={}", k, v) + } + }).collect::() + ), + format!("{err:?}") + ); + }; + } + + // Print debug message if operation is oneshot, otherwise in trace. + if operation.is_oneshot() { + debug!( + target: LOGGING_TARGET, + "service={} name={} {}: {operation} {message}", + info.scheme(), + info.name(), + format_args!( + "{}", + context.iter().enumerate().map(|(i, (k, v))| { + if i > 0 { + format!(" {}={}", k, v) + } else { + format!("{}={}", k, v) + } + }).collect::() + ), + ); + } else { + trace!( + target: LOGGING_TARGET, + "service={} name={} {}: {operation} {message}", + info.scheme(), + info.name(), + format_args!( + "{}", + context.iter().enumerate().map(|(i, (k, v))| { + if i > 0 { + format!(" {}={}", k, v) + } else { + format!("{}={}", k, v) + } + }).collect::() + ), + ); + }; + } +} + #[cfg(test)] mod tests { use super::*; From 4a527491a5aba9e563dda78d7087260932895735 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 20 Aug 2024 06:31:06 +0000 Subject: [PATCH 2/3] chore: apply suggestions from CR --- src/object-store/src/util.rs | 69 +++++++++++++----------------------- 1 file changed, 24 insertions(+), 45 deletions(-) diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 24ae3cea3fe1..832461216045 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Display; + use common_telemetry::{debug, error, trace}; use futures::TryStreamExt; use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer}; @@ -149,6 +151,21 @@ pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> Ob static LOGGING_TARGET: &str = "opendal::services"; +struct LoggingContext<'a>(&'a [(&'a str, &'a str)]); + +impl<'a> Display for LoggingContext<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for (i, (k, v)) in self.0.iter().enumerate() { + if i > 0 { + write!(f, " {}={}", k, v)?; + } else { + write!(f, "{}={}", k, v)?; + } + } + Ok(()) + } +} + #[derive(Debug, Copy, Clone, Default)] pub struct DefaultLoggingInterceptor; @@ -163,42 +180,22 @@ impl LoggingInterceptor for DefaultLoggingInterceptor { err: Option<&opendal::Error>, ) { if let Some(err) = err { - // Print error if it's unexpected, otherwise in debug. + // Print error if it's unexpected, otherwise in warn. if err.kind() == ErrorKind::Unexpected { error!( target: LOGGING_TARGET, - "service={} name={} {}: {operation} {message} {}", + "service={} name={} {}: {operation} {message} {err:#?}", info.scheme(), info.name(), - format_args!( - "{}", - context.iter().enumerate().map(|(i, (k, v))| { - if i > 0 { - format!(" {}={}", k, v) - } else { - format!("{}={}", k, v) - } - }).collect::() - ), - format!("{err:?}") + LoggingContext(context), ); } else { debug!( target: LOGGING_TARGET, - "service={} name={} {}: {operation} {message} {}", + "service={} name={} {}: {operation} {message} {err}", info.scheme(), info.name(), - format_args!( - "{}", - context.iter().enumerate().map(|(i, (k, v))| { - if i > 0 { - format!(" {}={}", k, v) - } else { - format!("{}={}", k, v) - } - }).collect::() - ), - format!("{err:?}") + LoggingContext(context), ); }; } @@ -210,16 +207,7 @@ impl LoggingInterceptor for DefaultLoggingInterceptor { "service={} name={} {}: {operation} {message}", info.scheme(), info.name(), - format_args!( - "{}", - context.iter().enumerate().map(|(i, (k, v))| { - if i > 0 { - format!(" {}={}", k, v) - } else { - format!("{}={}", k, v) - } - }).collect::() - ), + LoggingContext(context), ); } else { trace!( @@ -227,16 +215,7 @@ impl LoggingInterceptor for DefaultLoggingInterceptor { "service={} name={} {}: {operation} {message}", info.scheme(), info.name(), - format_args!( - "{}", - context.iter().enumerate().map(|(i, (k, v))| { - if i > 0 { - format!(" {}={}", k, v) - } else { - format!("{}={}", k, v) - } - }).collect::() - ), + LoggingContext(context), ); }; } From 79650e9ac0be49067fc2fc00e6ed3fed0477bd6d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 20 Aug 2024 14:40:16 +0800 Subject: [PATCH 3/3] Update src/object-store/src/util.rs Co-authored-by: Yingwen --- src/object-store/src/util.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 832461216045..e54b67c15263 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -180,7 +180,7 @@ impl LoggingInterceptor for DefaultLoggingInterceptor { err: Option<&opendal::Error>, ) { if let Some(err) = err { - // Print error if it's unexpected, otherwise in warn. + // Print error if it's unexpected, otherwise in error. if err.kind() == ErrorKind::Unexpected { error!( target: LOGGING_TARGET,