From 530353785cd632faa6ec71552d25ed9da137cb0b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 6 May 2024 21:26:01 +0800 Subject: [PATCH] refactor: remove re-export from logging (#3865) * refactor: remove re-export from logging Signed-off-by: Ruihang Xia * fix merge problem Signed-off-by: Ruihang Xia * run formatter Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- .../src/information_schema/cluster_info.rs | 2 +- src/cmd/src/cli/repl.rs | 6 ++-- src/cmd/src/datanode.rs | 6 ++-- src/cmd/src/frontend.rs | 6 ++-- src/cmd/src/metasrv.rs | 6 ++-- .../function/src/table/migrate_region.rs | 2 +- src/common/procedure/src/local.rs | 24 +++++++-------- src/common/procedure/src/local/runner.rs | 30 +++++++++---------- src/common/procedure/src/store.rs | 16 +++++----- src/common/runtime/src/repeated_task.rs | 11 ++++--- src/common/telemetry/src/logging.rs | 1 - src/common/telemetry/src/macros.rs | 2 +- src/datanode/src/store.rs | 2 +- src/datanode/src/store/azblob.rs | 2 +- src/datanode/src/store/fs.rs | 2 +- src/datanode/src/store/gcs.rs | 2 +- src/datanode/src/store/oss.rs | 2 +- src/datanode/src/store/s3.rs | 2 +- src/datatypes/src/value.rs | 4 +-- src/frontend/src/instance/prom_store.rs | 4 +-- .../create/sort/external_sort.rs | 6 ++-- src/log-store/src/config.rs | 2 +- src/mito2/src/schedule/scheduler.rs | 4 +-- src/object-store/src/layers/lru_cache.rs | 2 +- .../src/layers/lru_cache/read_cache.rs | 2 +- src/object-store/tests/object_store_test.rs | 20 ++++++------- src/operator/src/request.rs | 2 +- src/script/src/manager.rs | 6 ++-- src/script/src/table.rs | 13 ++++---- src/servers/src/grpc.rs | 3 +- src/servers/src/grpc/greptime_handler.rs | 6 ++-- src/servers/src/http.rs | 2 +- src/servers/src/http/error_result.rs | 2 +- src/servers/src/http/pprof.rs | 5 ++-- src/servers/src/mysql/handler.rs | 6 ++-- src/servers/src/server.rs | 2 +- tests-integration/src/database.rs | 4 +-- tests-integration/src/tests/instance_test.rs | 13 ++++---- 38 files changed, 112 insertions(+), 120 deletions(-) diff --git a/src/catalog/src/information_schema/cluster_info.rs b/src/catalog/src/information_schema/cluster_info.rs index 0f01852bb541..2a55810876e4 100644 --- a/src/catalog/src/information_schema/cluster_info.rs +++ b/src/catalog/src/information_schema/cluster_info.rs @@ -24,7 +24,7 @@ use common_meta::peer::Peer; use common_query::physical_plan::TaskContext; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; -use common_telemetry::logging::warn; +use common_telemetry::warn; use common_time::timestamp::Timestamp; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index edd174699cc7..b13bb776f856 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -26,7 +26,7 @@ use common_error::ext::ErrorExt; use common_meta::cache_invalidator::MultiCacheInvalidator; use common_query::Output; use common_recordbatch::RecordBatches; -use common_telemetry::logging; +use common_telemetry::debug; use either::Either; use meta_client::client::MetaClientBuilder; use query::datafusion::DatafusionQueryEngine; @@ -78,7 +78,7 @@ impl Repl { let history_file = history_file(); if let Err(e) = rl.load_history(&history_file) { - logging::debug!( + debug!( "failed to load history file on {}, error: {e}", history_file.display() ); @@ -225,7 +225,7 @@ impl Drop for Repl { if self.rl.helper().is_some() { let history_file = history_file(); if let Err(e) = self.rl.save_history(&history_file) { - logging::debug!( + debug!( "failed to save history file on {}, error: {e}", history_file.display() ); diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index ff0d90409023..58167024b2c9 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -18,7 +18,7 @@ use std::time::Duration; use async_trait::async_trait; use catalog::kvbackend::MetaKvBackend; use clap::Parser; -use common_telemetry::{info, logging}; +use common_telemetry::info; use common_wal::config::DatanodeWalConfig; use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; @@ -210,8 +210,8 @@ impl StartCommand { .await .context(StartDatanodeSnafu)?; - logging::info!("Datanode start command: {:#?}", self); - logging::info!("Datanode options: {:#?}", opts); + info!("Datanode start command: {:#?}", self); + info!("Datanode options: {:#?}", opts); let node_id = opts .node_id diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 13cba86191d0..021596cb3dc4 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -22,7 +22,7 @@ use client::client_manager::DatanodeClients; use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; -use common_telemetry::logging; +use common_telemetry::info; use common_time::timezone::set_default_timezone; use frontend::frontend::FrontendOptions; use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; @@ -219,8 +219,8 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - logging::info!("Frontend start command: {:#?}", self); - logging::info!("Frontend options: {:#?}", opts); + info!("Frontend start command: {:#?}", self); + info!("Frontend options: {:#?}", opts); set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?; diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index bc542ada3024..d83f7a460e3e 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -16,7 +16,7 @@ use std::time::Duration; use async_trait::async_trait; use clap::Parser; -use common_telemetry::logging; +use common_telemetry::info; use meta_srv::bootstrap::MetasrvInstance; use meta_srv::metasrv::MetasrvOptions; use snafu::ResultExt; @@ -198,8 +198,8 @@ impl StartCommand { .await .context(StartMetaServerSnafu)?; - logging::info!("Metasrv start command: {:#?}", self); - logging::info!("Metasrv options: {:#?}", opts); + info!("Metasrv start command: {:#?}", self); + info!("Metasrv options: {:#?}", opts); let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None) .await diff --git a/src/common/function/src/table/migrate_region.rs b/src/common/function/src/table/migrate_region.rs index 85692792ce3b..b208b258d7c0 100644 --- a/src/common/function/src/table/migrate_region.rs +++ b/src/common/function/src/table/migrate_region.rs @@ -20,7 +20,7 @@ use common_meta::rpc::procedure::MigrateRegionRequest; use common_query::error::Error::ThreadJoin; use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result}; use common_query::prelude::{Signature, TypeSignature, Volatility}; -use common_telemetry::logging::error; +use common_telemetry::error; use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::value::{Value, ValueRef}; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 4e13a82b240b..d52d48d988d1 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -24,7 +24,7 @@ use async_trait::async_trait; use backon::ExponentialBuilder; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; -use common_telemetry::{info, logging, tracing}; +use common_telemetry::{error, info, tracing}; use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::{Mutex as TokioMutex, Notify}; @@ -244,20 +244,18 @@ impl ManagerContext { ) -> Option { let loaders = self.loaders.lock().unwrap(); let loader = loaders.get(&message.type_name).or_else(|| { - logging::error!( + error!( "Loader not found, procedure_id: {}, type_name: {}", - procedure_id, - message.type_name + procedure_id, message.type_name ); None })?; let procedure = loader(&message.data) .map_err(|e| { - logging::error!( + error!( "Failed to load procedure data, key: {}, source: {:?}", - procedure_id, - e + procedure_id, e ); e }) @@ -496,7 +494,7 @@ impl LocalManager { continue; }; - logging::info!( + info!( "Recover root procedure {}-{}, step: {}", loaded_procedure.procedure.type_name(), procedure_id, @@ -521,7 +519,7 @@ impl LocalManager { loaded_procedure.step, loaded_procedure.procedure, ) { - logging::error!(e; "Failed to recover procedure {}", procedure_id); + error!(e; "Failed to recover procedure {}", procedure_id); } } } @@ -529,7 +527,7 @@ impl LocalManager { /// Recovers unfinished procedures and reruns them. async fn recover(&self) -> Result<()> { - logging::info!("LocalManager start to recover"); + info!("LocalManager start to recover"); let recover_start = Instant::now(); let (messages, rollback_messages, finished_ids) = @@ -539,19 +537,19 @@ impl LocalManager { self.submit_recovered_messages(messages, InitProcedureState::Running); if !finished_ids.is_empty() { - logging::info!( + info!( "LocalManager try to clean finished procedures, num: {}", finished_ids.len() ); for procedure_id in finished_ids { if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await { - logging::error!(e; "Failed to delete procedure {}", procedure_id); + error!(e; "Failed to delete procedure {}", procedure_id); } } } - logging::info!( + info!( "LocalManager finish recovery, cost: {}ms", recover_start.elapsed().as_millis() ); diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index ec4013467784..0e0ac2020a7a 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use backon::{BackoffBuilder, ExponentialBuilder}; -use common_telemetry::logging; +use common_telemetry::{debug, error, info}; use tokio::time; use super::rwlock::OwnedKeyRwLockGuard; @@ -54,7 +54,7 @@ impl ProcedureGuard { impl Drop for ProcedureGuard { fn drop(&mut self) { if !self.finish { - logging::error!("Procedure {} exits unexpectedly", self.meta.id); + error!("Procedure {} exits unexpectedly", self.meta.id); // Set state to failed. This is useful in test as runtime may not abort when the runner task panics. // See https://github.com/tokio-rs/tokio/issues/2002 . @@ -104,7 +104,7 @@ impl Runner { // Ensure we can update the procedure state. let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone()); - logging::info!( + info!( "Runner {}-{} starts", self.procedure.type_name(), self.meta.id @@ -149,7 +149,7 @@ impl Runner { for id in procedure_ids { if let Err(e) = self.store.delete_procedure(id).await { - logging::error!( + error!( e; "Runner {}-{} failed to delete procedure {}", self.procedure.type_name(), @@ -160,7 +160,7 @@ impl Runner { } } - logging::info!( + info!( "Runner {}-{} exits", self.procedure.type_name(), self.meta.id @@ -260,7 +260,7 @@ impl Runner { ProcedureState::Running | ProcedureState::Retrying { .. } => { match self.procedure.execute(ctx).await { Ok(status) => { - logging::debug!( + debug!( "Execute procedure {}-{} once, status: {:?}, need_persist: {}", self.procedure.type_name(), self.meta.id, @@ -299,7 +299,7 @@ impl Runner { } } Err(e) => { - logging::error!( + error!( e; "Failed to execute procedure {}-{}, retry: {}", self.procedure.type_name(), @@ -394,7 +394,7 @@ impl Runner { /// Extend the retry time to wait for the next retry. async fn wait_on_err(&mut self, d: Duration, i: u64) { - logging::info!( + info!( "Procedure {}-{} retry for the {} times after {} millis", self.procedure.type_name(), self.meta.id, @@ -407,7 +407,7 @@ impl Runner { async fn on_suspended(&mut self, subprocedures: Vec) { let has_child = !subprocedures.is_empty(); for subprocedure in subprocedures { - logging::info!( + info!( "Procedure {}-{} submit subprocedure {}-{}", self.procedure.type_name(), self.meta.id, @@ -422,7 +422,7 @@ impl Runner { ); } - logging::info!( + info!( "Procedure {}-{} is waiting for subprocedures", self.procedure.type_name(), self.meta.id, @@ -432,7 +432,7 @@ impl Runner { if has_child { self.meta.child_notify.notified().await; - logging::info!( + info!( "Procedure {}-{} is waked up", self.procedure.type_name(), self.meta.id, @@ -454,7 +454,7 @@ impl Runner { ) .await .map_err(|e| { - logging::error!( + error!( e; "Failed to persist procedure {}-{}", self.procedure.type_name(), self.meta.id @@ -470,7 +470,7 @@ impl Runner { .commit_procedure(self.meta.id, self.step) .await .map_err(|e| { - logging::error!( + error!( e; "Failed to commit procedure {}-{}", self.procedure.type_name(), self.meta.id @@ -496,7 +496,7 @@ impl Runner { .rollback_procedure(self.meta.id, message) .await .map_err(|e| { - logging::error!( + error!( e; "Failed to write rollback key for procedure {}-{}", self.procedure.type_name(), self.meta.id @@ -509,7 +509,7 @@ impl Runner { fn done(&self, output: Option) { // TODO(yingwen): Add files to remove list. - logging::info!( + info!( "Procedure {}-{} done", self.procedure.type_name(), self.meta.id, diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index cc96ed9f8032..d25df4b66a80 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::fmt; -use common_telemetry::logging; +use common_telemetry::{debug, error, info, warn}; use futures::TryStreamExt; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -65,7 +65,7 @@ impl ProcedureStore { /// Creates a new [ProcedureStore] from specific [StateStoreRef]. pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore { let proc_path = format!("{}{PROC_PATH}", parent_path); - logging::info!("The procedure state store path is: {}", &proc_path); + info!("The procedure state store path is: {}", &proc_path); ProcedureStore { proc_path, store } } @@ -154,7 +154,7 @@ impl ProcedureStore { while let Some((key_set, _)) = key_values.try_next().await? { let key = key_set.key(); let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else { - logging::warn!("Unknown key while deleting procedures, key: {}", key); + warn!("Unknown key while deleting procedures, key: {}", key); continue; }; if curr_key.key_type == KeyType::Step { @@ -165,11 +165,9 @@ impl ProcedureStore { } } - logging::debug!( + debug!( "Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}", - procedure_id, - step_keys, - finish_keys + procedure_id, step_keys, finish_keys ); // We delete all step keys first. self.store.batch_delete(step_keys.as_slice()).await?; @@ -203,7 +201,7 @@ impl ProcedureStore { while let Some((key_set, value)) = key_values.try_next().await? { let key = key_set.key(); let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else { - logging::warn!("Unknown key while loading procedures, key: {}", key); + warn!("Unknown key while loading procedures, key: {}", key); continue; }; @@ -251,7 +249,7 @@ impl ProcedureStore { serde_json::from_slice(value) .map_err(|e| { // `e` doesn't impl ErrorExt so we print it as normal error. - logging::error!("Failed to parse value, key: {:?}, source: {:?}", key, e); + error!("Failed to parse value, key: {:?}, source: {:?}", key, e); e }) .ok() diff --git a/src/common/runtime/src/repeated_task.rs b/src/common/runtime/src/repeated_task.rs index a4f2bde8b00a..5818e1def8f2 100644 --- a/src/common/runtime/src/repeated_task.rs +++ b/src/common/runtime/src/repeated_task.rs @@ -17,7 +17,7 @@ use std::sync::Mutex; use std::time::Duration; use common_error::ext::ErrorExt; -use common_telemetry::logging; +use common_telemetry::{debug, error}; use snafu::{ensure, ResultExt}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -128,17 +128,16 @@ impl RepeatedTask { } } if let Err(e) = task_fn.call().await { - logging::error!(e; "Failed to run repeated task: {}", task_fn.name()); + error!(e; "Failed to run repeated task: {}", task_fn.name()); } } }); inner.task_handle = Some(handle); self.started.store(true, Ordering::Relaxed); - logging::debug!( + debug!( "Repeated task {} started with interval: {:?}", - self.name, - self.interval + self.name, self.interval ); Ok(()) @@ -162,7 +161,7 @@ impl RepeatedTask { .await .context(WaitGcTaskStopSnafu { name: &self.name })?; - logging::debug!("Repeated task {} stopped", self.name); + debug!("Repeated task {} stopped", self.name); Ok(()) } diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index e4e9479a699d..d281d81d61cc 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -32,7 +32,6 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::{filter, EnvFilter, Registry}; use crate::tracing_sampler::{create_sampler, TracingSampleOptions}; -pub use crate::{debug, error, info, trace, warn}; const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317"; diff --git a/src/common/telemetry/src/macros.rs b/src/common/telemetry/src/macros.rs index 9ca91354a5df..cb838db6fef6 100644 --- a/src/common/telemetry/src/macros.rs +++ b/src/common/telemetry/src/macros.rs @@ -213,7 +213,7 @@ mod tests { #[test] fn test_log_error() { - crate::logging::init_default_ut_logging(); + crate::init_default_ut_logging(); let err = MockError::new(StatusCode::Unknown); let err_ref = &err; diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index e748aa5a2164..8b778306c466 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -25,7 +25,7 @@ use std::time::Duration; use std::{env, path}; use common_base::readable_size::ReadableSize; -use common_telemetry::logging::info; +use common_telemetry::info; use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; diff --git a/src/datanode/src/store/azblob.rs b/src/datanode/src/store/azblob.rs index 0154567540ce..156c2897ec9d 100644 --- a/src/datanode/src/store/azblob.rs +++ b/src/datanode/src/store/azblob.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_base::secrets::ExposeSecret; -use common_telemetry::logging::info; +use common_telemetry::info; use object_store::services::Azblob; use object_store::{util, ObjectStore}; use snafu::prelude::*; diff --git a/src/datanode/src/store/fs.rs b/src/datanode/src/store/fs.rs index e3a8513c6dd2..607598841261 100644 --- a/src/datanode/src/store/fs.rs +++ b/src/datanode/src/store/fs.rs @@ -14,7 +14,7 @@ use std::{fs, path}; -use common_telemetry::logging::info; +use common_telemetry::info; use object_store::services::Fs; use object_store::util::join_dir; use object_store::ObjectStore; diff --git a/src/datanode/src/store/gcs.rs b/src/datanode/src/store/gcs.rs index a7a808cdf338..e0a8c6a315a7 100644 --- a/src/datanode/src/store/gcs.rs +++ b/src/datanode/src/store/gcs.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_base::secrets::ExposeSecret; -use common_telemetry::logging::info; +use common_telemetry::info; use object_store::services::Gcs; use object_store::{util, ObjectStore}; use snafu::prelude::*; diff --git a/src/datanode/src/store/oss.rs b/src/datanode/src/store/oss.rs index 705d3c53d883..b807a991970a 100644 --- a/src/datanode/src/store/oss.rs +++ b/src/datanode/src/store/oss.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_base::secrets::ExposeSecret; -use common_telemetry::logging::info; +use common_telemetry::info; use object_store::services::Oss; use object_store::{util, ObjectStore}; use snafu::prelude::*; diff --git a/src/datanode/src/store/s3.rs b/src/datanode/src/store/s3.rs index 27197b1bb1eb..bf7c0bfe14a8 100644 --- a/src/datanode/src/store/s3.rs +++ b/src/datanode/src/store/s3.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_base::secrets::ExposeSecret; -use common_telemetry::logging::info; +use common_telemetry::info; use object_store::services::S3; use object_store::{util, ObjectStore}; use snafu::prelude::*; diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 01c1983f32b4..9c9b79a4a6a9 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -20,7 +20,7 @@ use arrow::datatypes::{DataType as ArrowDataType, Field}; use arrow_array::{Array, ListArray}; use common_base::bytes::{Bytes, StringBytes}; use common_decimal::Decimal128; -use common_telemetry::logging; +use common_telemetry::error; use common_time::date::Date; use common_time::datetime::DateTime; use common_time::interval::IntervalUnit; @@ -487,7 +487,7 @@ pub fn scalar_value_to_timestamp( ScalarValue::Utf8(Some(s)) => match Timestamp::from_str(s, timezone) { Ok(t) => Some(t), Err(e) => { - logging::error!(e;"Failed to convert string literal {s} to timestamp"); + error!(e;"Failed to convert string literal {s} to timestamp"); None } }, diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 8821f064f0bd..5055d5ac2215 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -26,7 +26,7 @@ use common_error::ext::BoxedError; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_query::Output; use common_recordbatch::RecordBatches; -use common_telemetry::{logging, tracing}; +use common_telemetry::{debug, tracing}; use operator::insert::InserterRef; use operator::statement::StatementExecutor; use prost::Message; @@ -119,7 +119,7 @@ impl Instance { let logical_plan = prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?; - logging::debug!( + debug!( "Prometheus remote read, table: {}, logical plan: {}", table_name, logical_plan.display_indent(), diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index cea2820c75a9..f43e3df20426 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_base::BitVec; -use common_telemetry::logging; +use common_telemetry::{debug, error}; use futures::stream; use crate::inverted_index::create::sort::external_provider::ExternalTempFileProvider; @@ -254,9 +254,9 @@ impl ExternalSorter { let entries = values.len(); IntermediateWriter::new(writer).write_all(values, bitmap_leading_zeros as _).await.inspect(|_| - logging::debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}") + debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}") ).inspect_err(|e| - logging::error!("Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}. Error: {e}") + error!("Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}. Error: {e}") ) } diff --git a/src/log-store/src/config.rs b/src/log-store/src/config.rs index 1f195f6a2766..2663e16b3264 100644 --- a/src/log-store/src/config.rs +++ b/src/log-store/src/config.rs @@ -47,7 +47,7 @@ mod tests { #[test] pub fn test_default_config() { - common_telemetry::logging::init_default_ut_logging(); + common_telemetry::common_telemetry::init_default_ut_logging(); let default = LogConfig::default(); info!("LogConfig::default(): {:?}", default); assert_eq!(1024 * 1024 * 1024, default.file_size); diff --git a/src/mito2/src/schedule/scheduler.rs b/src/mito2/src/schedule/scheduler.rs index 2edb48e6411b..9c2d5c20ab06 100644 --- a/src/mito2/src/schedule/scheduler.rs +++ b/src/mito2/src/schedule/scheduler.rs @@ -17,7 +17,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{Arc, RwLock}; -use common_telemetry::logging; +use common_telemetry::warn; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -149,7 +149,7 @@ impl Scheduler for LocalScheduler { impl Drop for LocalScheduler { fn drop(&mut self) { if self.state.load(Ordering::Relaxed) != STATE_STOP { - logging::warn!("scheduler should be stopped before dropping, which means the state of scheduler must be STATE_STOP"); + warn!("scheduler should be stopped before dropping, which means the state of scheduler must be STATE_STOP"); // We didn't call `stop()` so we cancel all background workers here. self.sender.write().unwrap().take(); diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 67010f025760..70d20710cbbb 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -22,7 +22,7 @@ use opendal::raw::{ }; use opendal::Result; mod read_cache; -use common_telemetry::logging::info; +use common_telemetry::info; use read_cache::ReadCache; /// An opendal layer with local LRU file cache supporting. diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index f5bbb9ff866a..f37e4772c31c 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use common_telemetry::logging::debug; +use common_telemetry::debug; use futures::FutureExt; use moka::future::Cache; use moka::notification::ListenerFuture; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index f5371a4a16c5..cfe0afe027db 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -16,7 +16,7 @@ use std::env; use std::sync::Arc; use anyhow::Result; -use common_telemetry::logging; +use common_telemetry::info; use common_test_util::temp_dir::create_temp_dir; use object_store::layers::LruCacheLayer; use object_store::services::{Fs, S3}; @@ -109,10 +109,10 @@ async fn test_fs_backend() -> Result<()> { #[tokio::test] async fn test_s3_backend() -> Result<()> { - logging::init_default_ut_logging(); + common_telemetry::init_default_ut_logging(); if let Ok(bucket) = env::var("GT_S3_BUCKET") { if !bucket.is_empty() { - logging::info!("Running s3 test."); + info!("Running s3 test."); let root = uuid::Uuid::new_v4().to_string(); @@ -138,10 +138,10 @@ async fn test_s3_backend() -> Result<()> { #[tokio::test] async fn test_oss_backend() -> Result<()> { - logging::init_default_ut_logging(); + common_telemetry::init_default_ut_logging(); if let Ok(bucket) = env::var("GT_OSS_BUCKET") { if !bucket.is_empty() { - logging::info!("Running oss test."); + info!("Running oss test."); let root = uuid::Uuid::new_v4().to_string(); @@ -166,10 +166,10 @@ async fn test_oss_backend() -> Result<()> { #[tokio::test] async fn test_azblob_backend() -> Result<()> { - logging::init_default_ut_logging(); + common_telemetry::init_default_ut_logging(); if let Ok(container) = env::var("GT_AZBLOB_CONTAINER") { if !container.is_empty() { - logging::info!("Running azblob test."); + info!("Running azblob test."); let root = uuid::Uuid::new_v4().to_string(); @@ -193,10 +193,10 @@ async fn test_azblob_backend() -> Result<()> { #[tokio::test] async fn test_gcs_backend() -> Result<()> { - logging::init_default_ut_logging(); + common_telemetry::init_default_ut_logging(); if let Ok(container) = env::var("GT_AZBLOB_CONTAINER") { if !container.is_empty() { - logging::info!("Running azblob test."); + info!("Running azblob test."); let mut builder = Gcs::default(); builder @@ -219,7 +219,7 @@ async fn test_gcs_backend() -> Result<()> { #[tokio::test] async fn test_file_backend_with_lru_cache() -> Result<()> { - logging::init_default_ut_logging(); + common_telemetry::init_default_ut_logging(); let data_dir = create_temp_dir("test_file_backend_with_lru_cache"); let tmp_dir = create_temp_dir("test_file_backend_with_lru_cache"); diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index b25228b5d068..7aac8204b03f 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -20,8 +20,8 @@ use catalog::CatalogManagerRef; use common_catalog::build_db_string; use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; -use common_telemetry::logging::{error, info}; use common_telemetry::tracing_context::TracingContext; +use common_telemetry::{error, info}; use futures_util::future; use partition::manager::{PartitionInfo, PartitionRuleManagerRef}; use session::context::QueryContextRef; diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 111adc24771e..a0d96e53a8ec 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -22,7 +22,7 @@ use catalog::{OpenSystemTableHook, RegisterSystemTableRequest}; use common_catalog::consts::{default_engine, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_query::Output; -use common_telemetry::logging; +use common_telemetry::info; use futures::future::FutureExt; use query::QueryEngineRef; use servers::query_handler::grpc::GrpcQueryHandlerRef; @@ -106,11 +106,11 @@ impl ScriptManager { let mut compiled = self.compiled.write().unwrap(); let _ = compiled.insert(name.to_string(), script.clone()); } - logging::info!("Compiled and cached script: {}", name); + info!("Compiled and cached script: {}", name); script.as_ref().register_udf().await; - logging::info!("Script register as UDF: {}", name); + info!("Script register as UDF: {}", name); Ok(script) } diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 676118feb854..d9becf552164 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -25,7 +25,7 @@ use catalog::error::CompileScriptInternalSnafu; use common_error::ext::{BoxedError, ErrorExt}; use common_query::OutputData; use common_recordbatch::{util as record_util, RecordBatch, SendableRecordBatchStream}; -use common_telemetry::logging; +use common_telemetry::{debug, info, warn}; use common_time::util; use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::{and, col, lit}; @@ -127,7 +127,7 @@ impl ScriptsTable { script_list.extend(part_of_scripts_list); } - logging::info!( + info!( "Found {} scripts in {}", script_list.len(), table_info.full_table_name() @@ -137,16 +137,15 @@ impl ScriptsTable { match PyScript::from_script(&script, query_engine.clone()) { Ok(script) => { script.register_udf().await; - logging::debug!( + debug!( "Script in `scripts` system table re-register as UDF: {}", name ); } Err(err) => { - logging::warn!( + warn!( r#"Failed to compile script "{}"" in `scripts` table: {}"#, - name, - err + name, err ); } } @@ -189,7 +188,7 @@ impl ScriptsTable { .map_err(BoxedError::new) .context(InsertScriptSnafu { name })?; - logging::info!( + info!( "Inserted script: {} into scripts table: {}, output: {:?}.", name, table_info.full_table_name(), diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 23cc9ebb4174..7afc36c8b58f 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -29,8 +29,7 @@ use async_trait::async_trait; use common_grpc::channel_manager::{ DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }; -use common_telemetry::logging::info; -use common_telemetry::{error, warn}; +use common_telemetry::{error, info, warn}; use futures::FutureExt; use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index a79217e6ee09..9ee670795a13 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -28,7 +28,7 @@ use common_error::status_code::StatusCode; use common_query::Output; use common_runtime::Runtime; use common_telemetry::tracing_context::{FutureExt, TracingContext}; -use common_telemetry::{logging, tracing}; +use common_telemetry::{debug, error, tracing}; use common_time::timezone::parse_timezone; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; @@ -91,10 +91,10 @@ impl GreptimeRequestHandler { .await .map_err(|e| { if e.status_code().should_log_error() { - logging::error!(e; "Failed to handle request"); + error!(e; "Failed to handle request"); } else { // Currently, we still print a debug log. - logging::debug!("Failed to handle request, err: {:?}", e); + debug!("Failed to handle request, err: {:?}", e); } e }) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 861eb36f99ef..4ba4e56b088c 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -31,7 +31,7 @@ use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatch; -use common_telemetry::logging::{error, info}; +use common_telemetry::{error, info}; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; diff --git a/src/servers/src/http/error_result.rs b/src/servers/src/http/error_result.rs index dac93c266ce2..6e063655f8ed 100644 --- a/src/servers/src/http/error_result.rs +++ b/src/servers/src/http/error_result.rs @@ -17,7 +17,7 @@ use axum::response::{IntoResponse, Response}; use axum::Json; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_telemetry::logging::{debug, error}; +use common_telemetry::{debug, error}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/src/servers/src/http/pprof.rs b/src/servers/src/http/pprof.rs index 23a9a2c2cb2e..1a4b282426f7 100644 --- a/src/servers/src/http/pprof.rs +++ b/src/servers/src/http/pprof.rs @@ -23,7 +23,6 @@ pub mod handler { use axum::extract::Query; use axum::http::StatusCode; use axum::response::IntoResponse; - use common_telemetry::logging; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -64,7 +63,7 @@ pub mod handler { #[axum_macros::debug_handler] pub async fn pprof_handler(Query(req): Query) -> Result { - logging::info!("start pprof, request: {:?}", req); + info!("start pprof, request: {:?}", req); let profiling = Profiling::new(Duration::from_secs(req.seconds), req.frequency.into()); let body = match req.output { @@ -76,7 +75,7 @@ pub mod handler { Output::Flamegraph => profiling.dump_flamegraph().await.context(DumpPprofSnafu)?, }; - logging::info!("finish pprof"); + info!("finish pprof"); Ok((StatusCode::OK, body)) } diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 9e43aea7b42b..d0f4372ef1fb 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -24,7 +24,7 @@ use chrono::{NaiveDate, NaiveDateTime}; use common_catalog::parse_optional_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_query::Output; -use common_telemetry::{debug, error, logging, tracing, warn}; +use common_telemetry::{debug, error, tracing, warn}; use datatypes::prelude::ConcreteDataType; use itertools::Itertools; use opensrv_mysql::{ @@ -327,7 +327,7 @@ impl AsyncMysqlShim for MysqlInstanceShi } }; - logging::debug!("Mysql execute prepared plan: {}", plan.display_indent()); + debug!("Mysql execute prepared plan: {}", plan.display_indent()); vec![ self.do_exec_plan(&sql_plan.query, plan, query_ctx.clone()) .await, @@ -335,7 +335,7 @@ impl AsyncMysqlShim for MysqlInstanceShi } None => { let query = replace_params(params, sql_plan.query); - logging::debug!("Mysql execute replaced query: {}", query); + debug!("Mysql execute replaced query: {}", query); self.do_query(&query, query_ctx.clone()).await } }; diff --git a/src/servers/src/server.rs b/src/servers/src/server.rs index c2086f2658c1..129141893ec4 100644 --- a/src/servers/src/server.rs +++ b/src/servers/src/server.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_runtime::Runtime; -use common_telemetry::logging::{error, info}; +use common_telemetry::{error, info}; use futures::future::{try_join_all, AbortHandle, AbortRegistration, Abortable}; use snafu::{ensure, ResultExt}; use tokio::sync::{Mutex, RwLock}; diff --git a/tests-integration/src/database.rs b/tests-integration/src/database.rs index 31254cefdcc2..5ad7c90686fe 100644 --- a/tests-integration/src/database.rs +++ b/tests-integration/src/database.rs @@ -30,7 +30,7 @@ use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::RecordBatchStreamWrapper; -use common_telemetry::logging; +use common_telemetry::error; use common_telemetry::tracing_context::W3cTrace; use futures_util::StreamExt; use prost::Message; @@ -188,7 +188,7 @@ impl Database { addr: client.addr().to_string(), source: BoxedError::new(ServerSnafu { code, msg }.build()), }; - logging::error!( + error!( "Failed to do Flight get, addr: {}, code: {}, source: {:?}", client.addr(), tonic_code, diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 28b1af971d75..b5ba0f658b94 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -20,7 +20,6 @@ use client::OutputData; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::Output; use common_recordbatch::util; -use common_telemetry::logging; use common_test_util::recordbatch::check_output_stream; use common_test_util::temp_dir; use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt64Vector, VectorRef}; @@ -1629,7 +1628,9 @@ async fn test_execute_copy_to_s3(instance: Arc) { #[apply(both_instances_cases)] async fn test_execute_copy_from_s3(instance: Arc) { - logging::init_default_ut_logging(); + use common_telemetry::info; + + common_telemetry::init_default_ut_logging(); if let Ok(bucket) = env::var("GT_S3_BUCKET") { if !bucket.is_empty() { let instance = instance.frontend(); @@ -1706,7 +1707,7 @@ async fn test_execute_copy_from_s3(instance: Arc) { "{} CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',REGION='{}')", test.sql, key_id, key, region, ); - logging::info!("Running sql: {}", sql); + info!("Running sql: {}", sql); let output = execute_sql(&instance, &sql).await.data; assert!(matches!(output, OutputData::AffectedRows(2))); @@ -1732,7 +1733,7 @@ async fn test_execute_copy_from_s3(instance: Arc) { #[apply(both_instances_cases)] async fn test_execute_copy_from_orc_with_cast(instance: Arc) { - logging::init_default_ut_logging(); + common_telemetry::init_default_ut_logging(); let instance = instance.frontend(); // setups @@ -1766,7 +1767,7 @@ async fn test_execute_copy_from_orc_with_cast(instance: Arc) { #[apply(both_instances_cases)] async fn test_execute_copy_from_orc(instance: Arc) { - logging::init_default_ut_logging(); + common_telemetry::init_default_ut_logging(); let instance = instance.frontend(); // setups @@ -1880,7 +1881,7 @@ async fn test_information_schema_dot_tables(instance: Arc) { #[apply(both_instances_cases)] async fn test_information_schema_dot_columns(instance: Arc) { - logging::init_default_ut_logging(); + common_telemetry::init_default_ut_logging(); let instance = instance.frontend(); let sql = "create table another_table(i timestamp time index)";