Skip to content

Commit

Permalink
chore: bump opendal version to 0.49 (#4587)
Browse files Browse the repository at this point in the history
* chore: bump opendal version to 0.49

* chore: apply suggestions from CR

* Update src/object-store/src/util.rs

Co-authored-by: Yingwen <[email protected]>

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
WenyXu and evenyag authored Aug 22, 2024
1 parent ff04109 commit 0025fa6
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 26 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 4 additions & 7 deletions src/common/datasource/src/object_store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use object_store::services::Fs;
use object_store::util::DefaultLoggingInterceptor;
use object_store::ObjectStore;
use snafu::ResultExt;

Expand All @@ -22,13 +23,9 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
let builder = Fs::default();
let object_store = ObjectStore::new(builder.root(root))
.context(BuildBackendSnafu)?
.layer(
object_store::layers::LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some("debug"))
.expect("input error level must be valid"),
)
.layer(object_store::layers::LoggingLayer::new(
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish();
Expand Down
11 changes: 4 additions & 7 deletions src/common/datasource/src/object_store/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

use object_store::services::S3;
use object_store::util::DefaultLoggingInterceptor;
use object_store::ObjectStore;
use snafu::ResultExt;

Expand Down Expand Up @@ -84,13 +85,9 @@ pub fn build_s3_backend(
// TODO(weny): Consider finding a better way to eliminate duplicate code.
Ok(ObjectStore::new(builder)
.context(error::BuildBackendSnafu)?
.layer(
object_store::layers::LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some("debug"))
.expect("input error level must be valid"),
)
.layer(object_store::layers::LoggingLayer::new(
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish())
Expand Down
2 changes: 1 addition & 1 deletion src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.48", features = [
opendal = { version = "0.49", features = [
"layers-tracing",
"services-azblob",
"services-fs",
Expand Down
88 changes: 79 additions & 9 deletions src/object-store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;

use common_telemetry::{debug, error, trace};
use futures::TryStreamExt;
use opendal::layers::{LoggingLayer, TracingLayer};
use opendal::{Entry, Lister};
use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer};
use opendal::raw::{AccessorInfo, Operation};
use opendal::{Entry, ErrorKind, Lister};

use crate::layers::PrometheusMetricsLayer;
use crate::ObjectStore;
Expand Down Expand Up @@ -140,17 +144,83 @@ pub(crate) fn extract_parent_path(path: &str) -> &str {
/// Attaches instrument layers to the object store.
pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
object_store
.layer(
LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some("debug"))
.expect("input error level must be valid"),
)
.layer(LoggingLayer::new(DefaultLoggingInterceptor))
.layer(TracingLayer)
.layer(PrometheusMetricsLayer::new(path_label))
}

static LOGGING_TARGET: &str = "opendal::services";

struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);

impl<'a> Display for LoggingContext<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for (i, (k, v)) in self.0.iter().enumerate() {
if i > 0 {
write!(f, " {}={}", k, v)?;
} else {
write!(f, "{}={}", k, v)?;
}
}
Ok(())
}
}

#[derive(Debug, Copy, Clone, Default)]
pub struct DefaultLoggingInterceptor;

impl LoggingInterceptor for DefaultLoggingInterceptor {
#[inline]
fn log(
&self,
info: &AccessorInfo,
operation: Operation,
context: &[(&str, &str)],
message: &str,
err: Option<&opendal::Error>,
) {
if let Some(err) = err {
// Print error if it's unexpected, otherwise in error.
if err.kind() == ErrorKind::Unexpected {
error!(
target: LOGGING_TARGET,
"service={} name={} {}: {operation} {message} {err:#?}",
info.scheme(),
info.name(),
LoggingContext(context),
);
} else {
debug!(
target: LOGGING_TARGET,
"service={} name={} {}: {operation} {message} {err}",
info.scheme(),
info.name(),
LoggingContext(context),
);
};
}

// Print debug message if operation is oneshot, otherwise in trace.
if operation.is_oneshot() {
debug!(
target: LOGGING_TARGET,
"service={} name={} {}: {operation} {message}",
info.scheme(),
info.name(),
LoggingContext(context),
);
} else {
trace!(
target: LOGGING_TARGET,
"service={} name={} {}: {operation} {message}",
info.scheme(),
info.name(),
LoggingContext(context),
);
};
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 0025fa6

Please sign in to comment.