Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump opendal version to 0.49 #4587

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 4 additions & 7 deletions src/common/datasource/src/object_store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use object_store::services::Fs;
use object_store::util::DefaultLoggingInterceptor;
use object_store::ObjectStore;
use snafu::ResultExt;

Expand All @@ -22,13 +23,9 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
let builder = Fs::default();
let object_store = ObjectStore::new(builder.root(root))
.context(BuildBackendSnafu)?
.layer(
object_store::layers::LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some("debug"))
.expect("input error level must be valid"),
)
.layer(object_store::layers::LoggingLayer::new(
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish();
Expand Down
11 changes: 4 additions & 7 deletions src/common/datasource/src/object_store/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

use object_store::services::S3;
use object_store::util::DefaultLoggingInterceptor;
use object_store::ObjectStore;
use snafu::ResultExt;

Expand Down Expand Up @@ -84,13 +85,9 @@ pub fn build_s3_backend(
// TODO(weny): Consider finding a better way to eliminate duplicate code.
Ok(ObjectStore::new(builder)
.context(error::BuildBackendSnafu)?
.layer(
object_store::layers::LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some("debug"))
.expect("input error level must be valid"),
)
.layer(object_store::layers::LoggingLayer::new(
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish())
Expand Down
2 changes: 1 addition & 1 deletion src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.48", features = [
opendal = { version = "0.49", features = [
"layers-tracing",
"services-azblob",
"services-fs",
Expand Down
109 changes: 100 additions & 9 deletions src/object-store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::{debug, error, trace};
use futures::TryStreamExt;
use opendal::layers::{LoggingLayer, TracingLayer};
use opendal::{Entry, Lister};
use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer};
use opendal::raw::{AccessorInfo, Operation};
use opendal::{Entry, ErrorKind, Lister};

use crate::layers::PrometheusMetricsLayer;
use crate::ObjectStore;
Expand Down Expand Up @@ -140,17 +142,106 @@ pub(crate) fn extract_parent_path(path: &str) -> &str {
/// Attaches instrument layers to the object store.
pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
object_store
.layer(
LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some("debug"))
.expect("input error level must be valid"),
)
.layer(LoggingLayer::new(DefaultLoggingInterceptor))
.layer(TracingLayer)
.layer(PrometheusMetricsLayer::new(path_label))
}

static LOGGING_TARGET: &str = "opendal::services";

#[derive(Debug, Copy, Clone, Default)]
pub struct DefaultLoggingInterceptor;

impl LoggingInterceptor for DefaultLoggingInterceptor {
#[inline]
fn log(
&self,
info: &AccessorInfo,
operation: Operation,
context: &[(&str, &str)],
message: &str,
err: Option<&opendal::Error>,
) {
if let Some(err) = err {
// Print error if it's unexpected, otherwise in debug.
if err.kind() == ErrorKind::Unexpected {
error!(
target: LOGGING_TARGET,
"service={} name={} {}: {operation} {message} {}",
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
info.scheme(),
info.name(),
format_args!(
"{}",
context.iter().enumerate().map(|(i, (k, v))| {
if i > 0 {
format!(" {}={}", k, v)
} else {
format!("{}={}", k, v)
}
}).collect::<String>()
),
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
format!("{err:?}")
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
);
} else {
debug!(
target: LOGGING_TARGET,
"service={} name={} {}: {operation} {message} {}",
info.scheme(),
info.name(),
format_args!(
"{}",
context.iter().enumerate().map(|(i, (k, v))| {
if i > 0 {
format!(" {}={}", k, v)
} else {
format!("{}={}", k, v)
}
}).collect::<String>()
),
format!("{err:?}")
);
};
}

// Print debug message if operation is oneshot, otherwise in trace.
if operation.is_oneshot() {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
debug!(
target: LOGGING_TARGET,
"service={} name={} {}: {operation} {message}",
info.scheme(),
info.name(),
format_args!(
"{}",
context.iter().enumerate().map(|(i, (k, v))| {
if i > 0 {
format!(" {}={}", k, v)
} else {
format!("{}={}", k, v)
}
}).collect::<String>()
),
);
} else {
trace!(
target: LOGGING_TARGET,
"service={} name={} {}: {operation} {message}",
info.scheme(),
info.name(),
format_args!(
"{}",
context.iter().enumerate().map(|(i, (k, v))| {
if i > 0 {
format!(" {}={}", k, v)
} else {
format!("{}={}", k, v)
}
}).collect::<String>()
),
);
};
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down