From ded4fdc0cbd2a5c84988a04866a8770bb85e71fc Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 25 Jul 2024 17:35:29 +0800 Subject: [PATCH 1/8] feat: remove dedicated runtime for grpc, mysql and pg protocols Signed-off-by: Ruihang Xia --- src/datanode/src/datanode.rs | 15 ++----- src/datanode/src/region_server.rs | 10 ++--- src/datanode/src/tests.rs | 2 +- src/frontend/src/server.rs | 30 ++----------- src/meta-srv/src/procedure/utils.rs | 4 +- src/servers/src/grpc/builder.rs | 8 ++-- src/servers/src/grpc/greptime_handler.rs | 5 +-- src/servers/src/grpc/region_server.rs | 4 +- src/servers/src/mysql/server.rs | 47 +++++++++----------- src/servers/src/postgres/server.rs | 4 +- src/servers/src/server.rs | 6 +-- src/servers/tests/grpc/mod.rs | 16 +++---- src/servers/tests/mysql/mysql_server_test.rs | 16 +++---- src/servers/tests/postgres/mod.rs | 12 +++-- tests-integration/src/cluster.rs | 12 +++-- tests-integration/src/test_util.rs | 36 +++++++-------- tests-integration/tests/grpc.rs | 4 +- 17 files changed, 89 insertions(+), 142 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index d4ba2f77cc24..5c885dfc9cd5 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -25,7 +25,6 @@ use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue} use common_meta::kv_backend::KvBackendRef; use common_meta::wal_options_allocator::prepare_wal_options; pub use common_procedure::options::ProcedureConfig; -use common_runtime::Runtime; use common_telemetry::{error, info, warn}; use common_wal::config::kafka::DatanodeKafkaConfig; use common_wal::config::raft_engine::RaftEngineConfig; @@ -55,8 +54,8 @@ use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use crate::error::{ self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, - MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, - ShutdownServerSnafu, StartServerSnafu, + MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu, ShutdownServerSnafu, + StartServerSnafu, }; use crate::event_listener::{ new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef, @@ -326,18 +325,10 @@ impl DatanodeBuilder { ); let query_engine = query_engine_factory.query_engine(); - let runtime = Arc::new( - Runtime::builder() - .worker_threads(opts.grpc.runtime_size) - .thread_name("io-handlers") - .build() - .context(RuntimeResourceSnafu)?, - ); - let table_provider_factory = Arc::new(DummyTableProviderFactory); let mut region_server = RegionServer::with_table_provider( query_engine, - runtime, + common_runtime::bg_runtime(), event_listener, table_provider_factory, ); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 83225334eaf9..c42827698253 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -82,7 +82,7 @@ pub struct RegionStat { impl RegionServer { pub fn new( query_engine: QueryEngineRef, - runtime: Arc, + runtime: Runtime, event_listener: RegionServerEventListenerRef, ) -> Self { Self::with_table_provider( @@ -95,7 +95,7 @@ impl RegionServer { pub fn with_table_provider( query_engine: QueryEngineRef, - runtime: Arc, + runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, ) -> Self { @@ -286,7 +286,7 @@ impl RegionServer { } } - pub fn runtime(&self) -> Arc { + pub fn runtime(&self) -> Runtime { self.inner.runtime.clone() } @@ -447,7 +447,7 @@ struct RegionServerInner { engines: RwLock>, region_map: DashMap, query_engine: QueryEngineRef, - runtime: Arc, + runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, } @@ -475,7 +475,7 @@ impl Debug for CurrentEngine { impl RegionServerInner { pub fn new( query_engine: QueryEngineRef, - runtime: Arc, + runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, ) -> Self { diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index b115b366c4af..645f871acdec 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -92,7 +92,7 @@ impl QueryEngine for MockQueryEngine { pub fn mock_region_server() -> RegionServer { RegionServer::new( Arc::new(MockQueryEngine), - Arc::new(Runtime::builder().build().unwrap()), + Runtime::builder().build().unwrap(), Box::new(NoopRegionServerEventListener), ) } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index a80415076195..5bad9f679f04 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use auth::UserProviderRef; use common_base::Plugins; use common_config::{Configurable, Mode}; -use common_runtime::Builder as RuntimeBuilder; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig}; @@ -65,20 +64,12 @@ where } pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result { - let grpc_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(opts.runtime_size) - .thread_name("grpc-handlers") - .build() - .context(error::RuntimeResourceSnafu)?, - ); - let grpc_config = GrpcServerConfig { max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize, max_send_message_size: opts.max_send_message_size.as_bytes() as usize, tls: opts.tls.clone(), }; - let builder = GrpcServerBuilder::new(grpc_config, grpc_runtime) + let builder = GrpcServerBuilder::new(grpc_config, common_runtime::bg_runtime()) .with_tls_config(opts.tls.clone()) .context(error::InvalidTlsConfigSnafu)?; Ok(builder) @@ -216,15 +207,8 @@ where // will not watch if watch is disabled in tls option maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?; - let mysql_io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(opts.runtime_size) - .thread_name("mysql-io-handlers") - .build() - .context(error::RuntimeResourceSnafu)?, - ); let mysql_server = MysqlServer::create_server( - mysql_io_runtime, + common_runtime::bg_runtime(), Arc::new(MysqlSpawnRef::new( ServerSqlQueryHandlerAdapter::arc(instance.clone()), user_provider.clone(), @@ -249,19 +233,11 @@ where maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?; - let pg_io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(opts.runtime_size) - .thread_name("pg-io-handlers") - .build() - .context(error::RuntimeResourceSnafu)?, - ); - let pg_server = Box::new(PostgresServer::new( ServerSqlQueryHandlerAdapter::arc(instance.clone()), opts.tls.should_force_tls(), tls_server_config, - pg_io_runtime, + common_runtime::bg_runtime(), user_provider.clone(), )) as Box; diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 67b0f496c520..09f0400ba118 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -34,7 +34,7 @@ pub mod mock { /// An mock implementation of region server that simply echoes the request. #[derive(Clone)] pub struct EchoRegionServer { - runtime: Arc, + runtime: Runtime, received_requests: mpsc::Sender, } @@ -43,7 +43,7 @@ pub mod mock { let (tx, rx) = mpsc::channel(10); ( Self { - runtime: Arc::new(RuntimeBuilder::default().worker_threads(2).build().unwrap()), + runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(), received_requests: tx, }, rx, diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs index f9a4dc2dbf8b..c2481c22130a 100644 --- a/src/servers/src/grpc/builder.rs +++ b/src/servers/src/grpc/builder.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use api::v1::greptime_database_server::GreptimeDatabaseServer; use api::v1::prometheus_gateway_server::PrometheusGatewayServer; use api::v1::region::region_server::RegionServer; @@ -65,13 +63,13 @@ macro_rules! add_service { pub struct GrpcServerBuilder { config: GrpcServerConfig, - runtime: Arc, + runtime: Runtime, routes_builder: RoutesBuilder, tls_config: Option, } impl GrpcServerBuilder { - pub fn new(config: GrpcServerConfig, runtime: Arc) -> Self { + pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self { Self { config, runtime, @@ -84,7 +82,7 @@ impl GrpcServerBuilder { &self.config } - pub fn runtime(&self) -> &Arc { + pub fn runtime(&self) -> &Runtime { &self.runtime } diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 726e8dd3304c..4fd0666d18af 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -14,7 +14,6 @@ //! Handler for Greptime Database service. It's implemented by frontend. -use std::sync::Arc; use std::time::Instant; use api::helper::request_type; @@ -42,14 +41,14 @@ use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; pub struct GreptimeRequestHandler { handler: ServerGrpcQueryHandlerRef, user_provider: Option, - runtime: Option>, + runtime: Option, } impl GreptimeRequestHandler { pub fn new( handler: ServerGrpcQueryHandlerRef, user_provider: Option, - runtime: Option>, + runtime: Option, ) -> Self { Self { handler, diff --git a/src/servers/src/grpc/region_server.rs b/src/servers/src/grpc/region_server.rs index e25ee209f0f5..366d90151f77 100644 --- a/src/servers/src/grpc/region_server.rs +++ b/src/servers/src/grpc/region_server.rs @@ -38,11 +38,11 @@ pub type RegionServerHandlerRef = Arc; #[derive(Clone)] pub struct RegionServerRequestHandler { handler: Arc, - runtime: Arc, + runtime: Runtime, } impl RegionServerRequestHandler { - pub fn new(handler: Arc, runtime: Arc) -> Self { + pub fn new(handler: Arc, runtime: Runtime) -> Self { Self { handler, runtime } } diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index d17d0e58d368..dd875750927a 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -109,7 +109,7 @@ pub struct MysqlServer { impl MysqlServer { pub fn create_server( - io_runtime: Arc, + io_runtime: Runtime, spawn_ref: Arc, spawn_config: Arc, ) -> Box { @@ -120,18 +120,14 @@ impl MysqlServer { }) } - fn accept( - &self, - io_runtime: Arc, - stream: AbortableStream, - ) -> impl Future { + fn accept(&self, io_runtime: Runtime, stream: AbortableStream) -> impl Future { let spawn_ref = self.spawn_ref.clone(); let spawn_config = self.spawn_config.clone(); stream.for_each(move |tcp_stream| { - let io_runtime = io_runtime.clone(); let spawn_ref = spawn_ref.clone(); let spawn_config = spawn_config.clone(); + let io_runtime = io_runtime.clone(); async move { match tcp_stream { @@ -140,11 +136,13 @@ impl MysqlServer { if let Err(e) = io_stream.set_nodelay(true) { warn!(e; "Failed to set TCP nodelay"); } - if let Err(error) = - Self::handle(io_stream, io_runtime, spawn_ref, spawn_config).await - { - warn!(error; "Unexpected error when handling TcpStream"); - }; + io_runtime.spawn(async move { + if let Err(error) = + Self::handle(io_stream, spawn_ref, spawn_config).await + { + warn!(error; "Unexpected error when handling TcpStream"); + }; + }); } }; } @@ -153,24 +151,23 @@ impl MysqlServer { async fn handle( stream: TcpStream, - io_runtime: Arc, spawn_ref: Arc, spawn_config: Arc, ) -> Result<()> { debug!("MySQL connection coming from: {}", stream.peer_addr()?); - let _handle = io_runtime.spawn(async move { - crate::metrics::METRIC_MYSQL_CONNECTIONS.inc(); - if let Err(e) = Self::do_handle(stream, spawn_ref, spawn_config).await { - if let Error::InternalIo { error } = &e && error.kind() == std::io::ErrorKind::ConnectionAborted { - // This is a client-side error, we don't need to log it. - } else { - // TODO(LFC): Write this error to client as well, in MySQL text protocol. - // Looks like we have to expose opensrv-mysql's `PacketWriter`? - warn!(e; "Internal error occurred during query exec, server actively close the channel to let client try next time"); - } + crate::metrics::METRIC_MYSQL_CONNECTIONS.inc(); + if let Err(e) = Self::do_handle(stream, spawn_ref, spawn_config).await { + if let Error::InternalIo { error } = &e + && error.kind() == std::io::ErrorKind::ConnectionAborted + { + // This is a client-side error, we don't need to log it. + } else { + // TODO(LFC): Write this error to client as well, in MySQL text protocol. + // Looks like we have to expose opensrv-mysql's `PacketWriter`? + warn!(e; "Internal error occurred during query exec, server actively close the channel to let client try next time"); } - crate::metrics::METRIC_MYSQL_CONNECTIONS.dec(); - }); + } + crate::metrics::METRIC_MYSQL_CONNECTIONS.dec(); Ok(()) } diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index 6b397dbb472c..ff64732ca17e 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -42,7 +42,7 @@ impl PostgresServer { query_handler: ServerSqlQueryHandlerRef, force_tls: bool, tls_server_config: Arc, - io_runtime: Arc, + io_runtime: Runtime, user_provider: Option, ) -> PostgresServer { let make_handler = Arc::new( @@ -62,7 +62,7 @@ impl PostgresServer { fn accept( &self, - io_runtime: Arc, + io_runtime: Runtime, accepting_stream: AbortableStream, ) -> impl Future { let handler_maker = self.make_handler.clone(); diff --git a/src/servers/src/server.rs b/src/servers/src/server.rs index 26dfd1ce689d..5d93c7e8af91 100644 --- a/src/servers/src/server.rs +++ b/src/servers/src/server.rs @@ -183,11 +183,11 @@ impl AcceptTask { pub(crate) struct BaseTcpServer { name: String, accept_task: Mutex, - io_runtime: Arc, + io_runtime: Runtime, } impl BaseTcpServer { - pub(crate) fn create_server(name: impl Into, io_runtime: Arc) -> Self { + pub(crate) fn create_server(name: impl Into, io_runtime: Runtime) -> Self { let (abort_handle, registration) = AbortHandle::new_pair(); Self { name: name.into(), @@ -218,7 +218,7 @@ impl BaseTcpServer { task.start_with(join_handle, &self.name) } - pub(crate) fn io_runtime(&self) -> Arc { + pub(crate) fn io_runtime(&self) -> Runtime { self.io_runtime.clone() } } diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index b2b198d851b8..021144745fad 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -40,14 +40,14 @@ use crate::{create_testing_grpc_query_handler, LOCALHOST_WITH_0}; struct MockGrpcServer { query_handler: ServerGrpcQueryHandlerRef, user_provider: Option, - runtime: Arc, + runtime: Runtime, } impl MockGrpcServer { fn new( query_handler: ServerGrpcQueryHandlerRef, user_provider: Option, - runtime: Arc, + runtime: Runtime, ) -> Self { Self { query_handler, @@ -107,13 +107,11 @@ impl Server for MockGrpcServer { fn create_grpc_server(table: TableRef) -> Result> { let query_handler = create_testing_grpc_query_handler(table); - let io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(4) - .thread_name("grpc-io-handlers") - .build() - .unwrap(), - ); + let io_runtime = RuntimeBuilder::default() + .worker_threads(4) + .thread_name("grpc-io-handlers") + .build() + .unwrap(); let provider = MockUserProvider::default(); diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index 6f5b20dc7e49..9e65dc861e1e 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -29,6 +29,7 @@ use rand::rngs::StdRng; use rand::Rng; use servers::error::Result; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; +use servers::query_handler::sql::SqlQueryHandler; use servers::server::Server; use servers::tls::{ReloadableTlsServerConfig, TlsOption}; use table::test_util::MemTable; @@ -45,14 +46,13 @@ struct MysqlOpts<'a> { } fn create_mysql_server(table: TableRef, opts: MysqlOpts<'_>) -> Result> { - let query_handler = create_testing_sql_query_handler(table); - let io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(4) - .thread_name("mysql-io-handlers") - .build() - .unwrap(), - ); + let query_handler: Arc + Send + Sync> = + create_testing_sql_query_handler(table); + let io_runtime = RuntimeBuilder::default() + .worker_threads(4) + .thread_name("mysql-io-handlers") + .build() + .unwrap(); let mut provider = MockUserProvider::default(); if let Some(auth_info) = opts.auth_info { diff --git a/src/servers/tests/postgres/mod.rs b/src/servers/tests/postgres/mod.rs index 3dec74d0e144..ad135dd9d0c3 100644 --- a/src/servers/tests/postgres/mod.rs +++ b/src/servers/tests/postgres/mod.rs @@ -43,13 +43,11 @@ fn create_postgres_server( auth_info: Option, ) -> Result> { let instance = Arc::new(create_testing_instance(table)); - let io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(4) - .thread_name("postgres-io-handlers") - .build() - .unwrap(), - ); + let io_runtime = RuntimeBuilder::default() + .worker_threads(4) + .thread_name("postgres-io-handlers") + .build() + .unwrap(); let user_provider: Option = if check_pwd { let mut provider = MockUserProvider::default(); if let Some(info) = auth_info { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 1c01aaf58479..19cb36a5b9b1 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -421,13 +421,11 @@ async fn build_datanode_clients( async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { let (client, server) = tokio::io::duplex(1024); - let runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(2) - .thread_name("grpc-handlers") - .build() - .unwrap(), - ); + let runtime = RuntimeBuilder::default() + .worker_threads(2) + .thread_name("grpc-handlers") + .build() + .unwrap(); let flight_handler = FlightCraftWrapper(datanode.region_server()); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 76f523071ba2..a1b132dad0c2 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -495,13 +495,11 @@ pub async fn setup_grpc_server_with( ) -> (String, TestGuard, Arc) { let instance = setup_standalone_instance(name, store_type).await; - let runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(2) - .thread_name("grpc-handlers") - .build() - .unwrap(), - ); + let runtime = RuntimeBuilder::default() + .worker_threads(2) + .thread_name("grpc-handlers") + .build() + .unwrap(); let fe_instance_ref = instance.instance.clone(); @@ -550,13 +548,11 @@ pub async fn setup_mysql_server_with_user_provider( ) -> (String, TestGuard, Arc>) { let instance = setup_standalone_instance(name, store_type).await; - let runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(2) - .thread_name("mysql-runtime") - .build() - .unwrap(), - ); + let runtime = RuntimeBuilder::default() + .worker_threads(2) + .thread_name("mysql-runtime") + .build() + .unwrap(); let fe_mysql_addr = format!("127.0.0.1:{}", ports::get_port()); @@ -607,13 +603,11 @@ pub async fn setup_pg_server_with_user_provider( ) -> (String, TestGuard, Arc>) { let instance = setup_standalone_instance(name, store_type).await; - let runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(2) - .thread_name("pg-runtime") - .build() - .unwrap(), - ); + let runtime = RuntimeBuilder::default() + .worker_threads(2) + .thread_name("pg-runtime") + .build() + .unwrap(); let fe_pg_addr = format!("127.0.0.1:{}", ports::get_port()); diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 33332170db16..664e7c7a674f 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use api::v1::alter_expr::Kind; use api::v1::promql_request::Promql; use api::v1::{ @@ -790,7 +788,7 @@ pub async fn test_grpc_tls_config(store_type: StorageType) { max_send_message_size: 1024, tls, }; - let runtime = Arc::new(Runtime::builder().build().unwrap()); + let runtime = Runtime::builder().build().unwrap(); let grpc_builder = GrpcServerBuilder::new(config.clone(), runtime).with_tls_config(config.tls); assert!(grpc_builder.is_err()); From a88b7c7cec8a62dc0ead9079c1152f087dd4fd39 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 25 Jul 2024 21:32:31 +0800 Subject: [PATCH 2/8] remove other runtimes Signed-off-by: Ruihang Xia --- config/config.md | 20 ++-- config/datanode.example.toml | 6 +- config/frontend.example.toml | 6 +- config/metasrv.example.toml | 6 +- config/standalone.example.toml | 6 +- docs/how-to/how-to-write-fuzz-tests.md | 2 +- src/cmd/tests/load_config_test.rs | 20 ++-- src/common/datasource/src/file_format/csv.rs | 2 +- src/common/datasource/src/file_format/json.rs | 2 +- src/common/greptimedb-telemetry/src/lib.rs | 2 +- src/common/grpc/src/channel_manager.rs | 2 +- src/common/macro/src/admin_fn.rs | 4 +- src/common/procedure/src/local.rs | 4 +- src/common/procedure/src/local/runner.rs | 2 +- src/common/runtime/src/global.rs | 105 +++++++----------- src/common/runtime/src/lib.rs | 6 +- src/common/runtime/src/repeated_task.rs | 4 +- src/datanode/src/alive_keeper.rs | 4 +- src/datanode/src/datanode.rs | 4 +- src/datanode/src/heartbeat/handler.rs | 2 +- src/datanode/src/heartbeat/task_tracker.rs | 2 +- src/datanode/src/region_server.rs | 2 +- src/flow/src/adapter.rs | 2 +- src/flow/src/server.rs | 2 +- src/frontend/src/server.rs | 6 +- src/log-store/src/raft_engine/log_store.rs | 4 +- src/meta-srv/src/election/etcd.rs | 2 +- src/meta-srv/src/handler.rs | 2 +- src/meta-srv/src/handler/failure_handler.rs | 2 +- src/meta-srv/src/lock.rs | 2 +- src/meta-srv/src/metasrv.rs | 6 +- .../downgrade_leader_region.rs | 4 +- .../src/procedure/region_migration/manager.rs | 2 +- .../procedure/region_migration/test_util.rs | 2 +- .../upgrade_candidate_region.rs | 4 +- src/meta-srv/src/service/heartbeat.rs | 2 +- src/mito2/src/compaction/compactor.rs | 2 +- src/mito2/src/engine.rs | 2 +- src/mito2/src/manifest/checkpointer.rs | 2 +- src/mito2/src/read/scan_region.rs | 2 +- src/mito2/src/schedule/scheduler.rs | 2 +- src/mito2/src/sst/index/puffin_manager.rs | 2 +- src/mito2/src/worker.rs | 4 +- src/mito2/src/worker/handle_drop.rs | 2 +- src/mito2/src/worker/handle_manifest.rs | 6 +- src/mito2/src/worker/handle_open.rs | 2 +- src/operator/src/delete.rs | 2 +- src/operator/src/insert.rs | 4 +- src/operator/src/request.rs | 2 +- .../fs_puffin_manager/reader.rs | 2 +- .../puffin_manager/stager/bounded_stager.rs | 2 +- src/script/src/python/utils.rs | 4 +- src/servers/src/export_metrics.rs | 4 +- src/servers/src/grpc.rs | 2 +- src/servers/src/grpc/flight/stream.rs | 2 +- src/servers/src/http.rs | 2 +- src/servers/src/mysql/server.rs | 2 +- src/servers/src/postgres/server.rs | 2 +- .../failover/fuzz_failover_metric_regions.rs | 2 +- .../failover/fuzz_failover_mito_regions.rs | 2 +- .../targets/fuzz_alter_logical_table.rs | 2 +- tests-fuzz/targets/fuzz_alter_table.rs | 2 +- tests-fuzz/targets/fuzz_create_database.rs | 2 +- .../targets/fuzz_create_logical_table.rs | 2 +- tests-fuzz/targets/fuzz_create_table.rs | 2 +- tests-fuzz/targets/fuzz_insert.rs | 2 +- .../targets/fuzz_insert_logical_table.rs | 2 +- .../migration/fuzz_migrate_mito_regions.rs | 2 +- .../unstable/fuzz_create_table_standalone.rs | 2 +- 69 files changed, 146 insertions(+), 187 deletions(-) diff --git a/config/config.md b/config/config.md index fb322e3e604d..b1e39e31c3fa 100644 --- a/config/config.md +++ b/config/config.md @@ -16,9 +16,8 @@ | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. | | `default_timezone` | String | `None` | The default timezone of the server. | | `runtime` | -- | -- | The runtime options. | -| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | -| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | -| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. | +| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. | @@ -169,9 +168,8 @@ | `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. | | `default_timezone` | String | `None` | The default timezone of the server. | | `runtime` | -- | -- | The runtime options. | -| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | -| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | -| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. | +| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | | `heartbeat` | -- | -- | The heartbeat options. | | `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. | | `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | @@ -261,9 +259,8 @@ | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). | | `runtime` | -- | -- | The runtime options. | -| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | -| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | -| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. | +| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | | `procedure` | -- | -- | Procedure storage options. | | `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. | | `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially | @@ -337,9 +334,8 @@ | `grpc.tls.key_path` | String | `None` | Private key file path. | | `grpc.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload.
For now, gRPC tls config does not support auto reload. | | `runtime` | -- | -- | The runtime options. | -| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | -| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | -| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. | +| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | | `heartbeat` | -- | -- | The heartbeat options. | | `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. | | `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index ea8c95cf7005..97e4fae1d503 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -73,11 +73,9 @@ watch = false ## The runtime options. [runtime] ## The number of threads to execute the runtime for global read operations. -read_rt_size = 8 +global_rt_size = 8 ## The number of threads to execute the runtime for global write operations. -write_rt_size = 8 -## The number of threads to execute the runtime for global background operations. -bg_rt_size = 4 +compact_rt_size = 4 ## The heartbeat options. [heartbeat] diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 6c2bddbc528c..534114e6dcfe 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -8,11 +8,9 @@ default_timezone = "UTC" ## The runtime options. [runtime] ## The number of threads to execute the runtime for global read operations. -read_rt_size = 8 +global_rt_size = 8 ## The number of threads to execute the runtime for global write operations. -write_rt_size = 8 -## The number of threads to execute the runtime for global background operations. -bg_rt_size = 4 +compact_rt_size = 4 ## The heartbeat options. [heartbeat] diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index e341479ca8f1..b2976aa4f84b 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -34,11 +34,9 @@ enable_region_failover = false ## The runtime options. [runtime] ## The number of threads to execute the runtime for global read operations. -read_rt_size = 8 +global_rt_size = 8 ## The number of threads to execute the runtime for global write operations. -write_rt_size = 8 -## The number of threads to execute the runtime for global background operations. -bg_rt_size = 4 +compact_rt_size = 4 ## Procedure storage options. [procedure] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 0944d5985e80..8f5fcf9bebb3 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -11,11 +11,9 @@ default_timezone = "UTC" ## The runtime options. [runtime] ## The number of threads to execute the runtime for global read operations. -read_rt_size = 8 +global_rt_size = 8 ## The number of threads to execute the runtime for global write operations. -write_rt_size = 8 -## The number of threads to execute the runtime for global background operations. -bg_rt_size = 4 +compact_rt_size = 4 ## The HTTP server options. [http] diff --git a/docs/how-to/how-to-write-fuzz-tests.md b/docs/how-to/how-to-write-fuzz-tests.md index 88d6c0914a5d..113b71027a15 100644 --- a/docs/how-to/how-to-write-fuzz-tests.md +++ b/docs/how-to/how-to-write-fuzz-tests.md @@ -105,7 +105,7 @@ use tests_fuzz::utils::{init_greptime_connections, Connections}; fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections().await; let mut rng = ChaChaRng::seed_from_u64(input.seed); let columns = rng.gen_range(2..30); diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index fff3f3e8c9a6..a6a632805951 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -46,9 +46,8 @@ fn test_load_datanode_example_config() { let expected = GreptimeOptions:: { runtime: RuntimeOptions { - read_rt_size: 8, - write_rt_size: 8, - bg_rt_size: 4, + global_rt_size: 8, + compact_rt_size: 4, }, component: DatanodeOptions { node_id: Some(42), @@ -119,9 +118,8 @@ fn test_load_frontend_example_config() { .unwrap(); let expected = GreptimeOptions:: { runtime: RuntimeOptions { - read_rt_size: 8, - write_rt_size: 8, - bg_rt_size: 4, + global_rt_size: 8, + compact_rt_size: 4, }, component: FrontendOptions { default_timezone: Some("UTC".to_string()), @@ -167,9 +165,8 @@ fn test_load_metasrv_example_config() { .unwrap(); let expected = GreptimeOptions:: { runtime: RuntimeOptions { - read_rt_size: 8, - write_rt_size: 8, - bg_rt_size: 4, + global_rt_size: 8, + compact_rt_size: 4, }, component: MetasrvOptions { selector: SelectorType::LeaseBased, @@ -200,9 +197,8 @@ fn test_load_standalone_example_config() { .unwrap(); let expected = GreptimeOptions:: { runtime: RuntimeOptions { - read_rt_size: 8, - write_rt_size: 8, - bg_rt_size: 4, + global_rt_size: 8, + compact_rt_size: 4, }, component: StandaloneOptions { default_timezone: Some("UTC".to_string()), diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index 1172004a9e19..66a2fc3c3f60 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -185,7 +185,7 @@ impl FileFormat for CsvFormat { let schema_infer_max_record = self.schema_infer_max_record; let has_header = self.has_header; - common_runtime::spawn_blocking_read(move || { + common_runtime::spawn_blocking_global(move || { let reader = SyncIoBridge::new(decoded); let (schema, _records_read) = diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index 3599fcd4ec9d..c70a9beebbe4 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -101,7 +101,7 @@ impl FileFormat for JsonFormat { let schema_infer_max_record = self.schema_infer_max_record; - common_runtime::spawn_blocking_read(move || { + common_runtime::spawn_blocking_global(move || { let mut reader = BufReader::new(SyncIoBridge::new(decoded)); let iter = ValueIter::new(&mut reader, schema_infer_max_record); diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs index 1f02c524e7c5..d681b20092c2 100644 --- a/src/common/greptimedb-telemetry/src/lib.rs +++ b/src/common/greptimedb-telemetry/src/lib.rs @@ -72,7 +72,7 @@ impl GreptimeDBTelemetryTask { match self { GreptimeDBTelemetryTask::Enable((task, _)) => { print_anonymous_usage_data_disclaimer(); - task.start(common_runtime::bg_runtime()) + task.start(common_runtime::global_runtime()) } GreptimeDBTelemetryTask::Disable => Ok(()), } diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 0b77fa326ea1..eee173c3aa2a 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -225,7 +225,7 @@ impl ChannelManager { } let pool = self.pool.clone(); - let _handle = common_runtime::spawn_bg(async { + let _handle = common_runtime::spawn_global(async { recycle_channel_in_loop(pool, RECYCLE_CHANNEL_INTERVAL_SECS).await; }); info!( diff --git a/src/common/macro/src/admin_fn.rs b/src/common/macro/src/admin_fn.rs index be6b77a5ebf4..2c5a4e376a2f 100644 --- a/src/common/macro/src/admin_fn.rs +++ b/src/common/macro/src/admin_fn.rs @@ -219,7 +219,7 @@ fn build_struct( .create_mutable_vector(rows_num); if columns_num == 0 { - let result = common_runtime::block_on_read(async move { + let result = common_runtime::block_on_global(async move { #fn_name(handler, query_ctx, &[]).await })?; @@ -230,7 +230,7 @@ fn build_struct( .map(|vector| vector.get_ref(i)) .collect(); - let result = common_runtime::block_on_read(async move { + let result = common_runtime::block_on_global(async move { #fn_name(handler, query_ctx, &args).await })?; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 5fdd81a9562f..54ae88a5cf50 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -461,7 +461,7 @@ impl LocalManager { let tracing_context = TracingContext::from_current_span(); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { // Run the root procedure. // The task was moved to another runtime for execution. // In order not to interrupt tracing, a span needs to be created to continue tracing the current task. @@ -593,7 +593,7 @@ impl ProcedureManager for LocalManager { let task_inner = self.build_remove_outdated_meta_task(); task_inner - .start(common_runtime::bg_runtime()) + .start(common_runtime::global_runtime()) .context(StartRemoveOutdatedMetaTaskSnafu)?; *task = Some(task_inner); diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 1d1439e24ca8..d4ed98342125 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -393,7 +393,7 @@ impl Runner { // Add the id of the subprocedure to the metadata. self.meta.push_child(procedure_id); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { // Run the root procedure. runner.run().await }); diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index c0bf1eabb069..a53ca1e893c9 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -23,29 +23,25 @@ use serde::{Deserialize, Serialize}; use crate::{Builder, JoinHandle, Runtime}; -const READ_WORKERS: usize = 8; -const WRITE_WORKERS: usize = 8; -const BG_WORKERS: usize = 4; +const GLOBAL_WORKERS: usize = 8; +const COMPACT_WORKERS: usize = 4; const HB_WORKERS: usize = 2; /// The options for the global runtimes. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct RuntimeOptions { - /// The number of threads to execute the runtime for global read operations. - pub read_rt_size: usize, - /// The number of threads to execute the runtime for global write operations. - pub write_rt_size: usize, - /// The number of threads to execute the runtime for global background operations. - pub bg_rt_size: usize, + /// The number of threads for the global default runtime. + pub global_rt_size: usize, + /// The number of threads to execute the runtime for compact operations. + pub compact_rt_size: usize, } impl Default for RuntimeOptions { fn default() -> Self { let cpus = num_cpus::get(); Self { - read_rt_size: cpus, - write_rt_size: cpus, - bg_rt_size: usize::max(cpus / 2, 1), + global_rt_size: cpus, + compact_rt_size: usize::max(cpus / 2, 1), } } } @@ -61,9 +57,8 @@ pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usi } struct GlobalRuntimes { - read_runtime: Runtime, - write_runtime: Runtime, - bg_runtime: Runtime, + global_runtime: Runtime, + compact_runtime: Runtime, hb_runtime: Runtime, } @@ -95,48 +90,38 @@ macro_rules! define_spawn { } impl GlobalRuntimes { - define_spawn!(read); - define_spawn!(write); - define_spawn!(bg); + define_spawn!(global); + define_spawn!(compact); define_spawn!(hb); - fn new( - read: Option, - write: Option, - background: Option, - heartbeat: Option, - ) -> Self { + fn new(global: Option, compact: Option, heartbeat: Option) -> Self { Self { - read_runtime: read - .unwrap_or_else(|| create_runtime("global-read", "read-worker", READ_WORKERS)), - write_runtime: write - .unwrap_or_else(|| create_runtime("global-write", "write-worker", WRITE_WORKERS)), - bg_runtime: background - .unwrap_or_else(|| create_runtime("global-bg", "bg-worker", BG_WORKERS)), + global_runtime: global + .unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS)), + compact_runtime: compact + .unwrap_or_else(|| create_runtime("compact", "compact-worker", COMPACT_WORKERS)), hb_runtime: heartbeat - .unwrap_or_else(|| create_runtime("global-hb", "hb-worker", HB_WORKERS)), + .unwrap_or_else(|| create_runtime("heartbeat", "hb-worker", HB_WORKERS)), } } } #[derive(Default)] struct ConfigRuntimes { - read_runtime: Option, - write_runtime: Option, - bg_runtime: Option, + global_runtime: Option, + compact_runtime: Option, hb_runtime: Option, already_init: bool, } static GLOBAL_RUNTIMES: Lazy = Lazy::new(|| { let mut c = CONFIG_RUNTIMES.lock().unwrap(); - let read = c.read_runtime.take(); - let write = c.write_runtime.take(); - let background = c.bg_runtime.take(); + let global = c.global_runtime.take(); + let compact = c.compact_runtime.take(); let heartbeat = c.hb_runtime.take(); c.already_init = true; - GlobalRuntimes::new(read, write, background, heartbeat) + GlobalRuntimes::new(global, compact, heartbeat) }); static CONFIG_RUNTIMES: Lazy> = @@ -152,20 +137,15 @@ pub fn init_global_runtimes(options: &RuntimeOptions) { START.call_once(move || { let mut c = CONFIG_RUNTIMES.lock().unwrap(); assert!(!c.already_init, "Global runtimes already initialized"); - c.read_runtime = Some(create_runtime( - "global-read", - "global-read-worker", - options.read_rt_size, + c.global_runtime = Some(create_runtime( + "global", + "global-worker", + options.global_rt_size, )); - c.write_runtime = Some(create_runtime( - "global-write", - "global-write-worker", - options.write_rt_size, - )); - c.bg_runtime = Some(create_runtime( - "global-bg", - "global-bg-worker", - options.bg_rt_size, + c.compact_runtime = Some(create_runtime( + "compact", + "compact-worker", + options.compact_rt_size, )); c.hb_runtime = Some(create_runtime("global-hb", "global-hb-worker", HB_WORKERS)); }); @@ -205,9 +185,8 @@ macro_rules! define_global_runtime_spawn { }; } -define_global_runtime_spawn!(read); -define_global_runtime_spawn!(write); -define_global_runtime_spawn!(bg); +define_global_runtime_spawn!(global); +define_global_runtime_spawn!(compact); define_global_runtime_spawn!(hb); #[cfg(test)] @@ -218,16 +197,13 @@ mod tests { #[test] fn test_spawn_block_on() { - let handle = spawn_read(async { 1 + 1 }); - assert_eq!(2, block_on_read(handle).unwrap()); - - let handle = spawn_write(async { 2 + 2 }); - assert_eq!(4, block_on_write(handle).unwrap()); + let handle = spawn_global(async { 1 + 1 }); + assert_eq!(2, block_on_global(handle).unwrap()); - let handle = spawn_bg(async { 3 + 3 }); - assert_eq!(6, block_on_bg(handle).unwrap()); + let handle = spawn_compact(async { 2 + 2 }); + assert_eq!(4, block_on_compact(handle).unwrap()); - let handle = spawn_bg(async { 4 + 4 }); + let handle = spawn_hb(async { 4 + 4 }); assert_eq!(8, block_on_hb(handle).unwrap()); } @@ -253,8 +229,7 @@ mod tests { }; } - define_spawn_blocking_test!(read); - define_spawn_blocking_test!(write); - define_spawn_blocking_test!(bg); + define_spawn_blocking_test!(global); + define_spawn_blocking_test!(compact); define_spawn_blocking_test!(hb); } diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index 868c2e70fc03..4429f6fa71ab 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -19,9 +19,9 @@ mod repeated_task; pub mod runtime; pub use global::{ - bg_runtime, block_on_bg, block_on_read, block_on_write, create_runtime, init_global_runtimes, - read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_hb, spawn_blocking_read, - spawn_blocking_write, spawn_hb, spawn_read, spawn_write, write_runtime, + block_on_compact, block_on_global, compact_runtime, create_runtime, global_runtime, + init_global_runtimes, spawn_blocking_compact, spawn_blocking_global, spawn_blocking_hb, + spawn_compact, spawn_global, spawn_hb, }; pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction}; diff --git a/src/common/runtime/src/repeated_task.rs b/src/common/runtime/src/repeated_task.rs index cf9f02ffddb2..2431a2ee17fb 100644 --- a/src/common/runtime/src/repeated_task.rs +++ b/src/common/runtime/src/repeated_task.rs @@ -200,7 +200,7 @@ mod tests { let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn)); - task.start(crate::bg_runtime()).unwrap(); + task.start(crate::global_runtime()).unwrap(); tokio::time::sleep(Duration::from_millis(550)).await; task.stop().await.unwrap(); @@ -217,7 +217,7 @@ mod tests { let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn)) .with_initial_delay(Some(Duration::ZERO)); - task.start(crate::bg_runtime()).unwrap(); + task.start(crate::global_runtime()).unwrap(); tokio::time::sleep(Duration::from_millis(550)).await; task.stop().await.unwrap(); diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 711ab5c3b6a6..072f216733e8 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -187,7 +187,7 @@ impl RegionAliveKeeper { let running = self.started.clone(); // Watches changes - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { loop { if !running.load(Ordering::Relaxed) { info!("RegionAliveKeeper stopped! Quits the watch loop!"); @@ -286,7 +286,7 @@ impl CountdownTaskHandle { region_id, rx, }; - let handler = common_runtime::spawn_bg(async move { + let handler = common_runtime::spawn_global(async move { countdown_task.run().await; }); diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 5c885dfc9cd5..0cee15031c87 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -223,7 +223,7 @@ impl DatanodeBuilder { if self.opts.init_regions_in_background { // Opens regions in background. - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { if let Err(err) = open_all_regions.await { error!(err; "Failed to open regions during the startup."); } @@ -328,7 +328,7 @@ impl DatanodeBuilder { let table_provider_factory = Arc::new(DummyTableProviderFactory); let mut region_server = RegionServer::with_table_provider( query_engine, - common_runtime::bg_runtime(), + common_runtime::global_runtime(), event_listener, table_provider_factory, ); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 6b581e89ed83..48320f9b1101 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -108,7 +108,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { let region_server = self.region_server.clone(); let catchup_tasks = self.catchup_tasks.clone(); let handler = Self::build_handler(instruction)?; - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { let reply = handler(HandlerContext { region_server, catchup_tasks, diff --git a/src/datanode/src/heartbeat/task_tracker.rs b/src/datanode/src/heartbeat/task_tracker.rs index 626754722204..977054661c52 100644 --- a/src/datanode/src/heartbeat/task_tracker.rs +++ b/src/datanode/src/heartbeat/task_tracker.rs @@ -156,7 +156,7 @@ impl TaskTracker { } else { let moved_inner = self.inner.clone(); let (tx, rx) = watch::channel(TaskState::::Running); - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { match fut.await { Ok(result) => { let _ = tx.send(TaskState::Done(result)); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index c42827698253..20441b86f657 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -860,7 +860,7 @@ impl RegionServerInner { // complains "higher-ranked lifetime error". Rust can't prove some future is legit. // Possible related issue: https://github.com/rust-lang/rust/issues/102211 // - // The walkaround is to put the async functions in the `common_runtime::spawn_bg`. Or like + // The walkaround is to put the async functions in the `common_runtime::spawn_global`. Or like // it here, collect the values first then use later separately. let regions = self diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 6593785ac7b9..08706986fc38 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -462,7 +462,7 @@ impl FlowWorkerManager { shutdown: Option>, ) -> JoinHandle<()> { info!("Starting flownode manager's background task"); - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { self.run(shutdown).await; }) } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index cbe95143d673..ca8e5e0d812e 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -181,7 +181,7 @@ impl servers::server::Server for FlownodeServer { let builder = tonic::transport::Server::builder().add_service(self.create_flow_service()); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { let _result = builder .serve_with_incoming_shutdown(incoming, rx_server.recv().map(drop)) .await diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 5bad9f679f04..9e08af6cc6dd 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -69,7 +69,7 @@ where max_send_message_size: opts.max_send_message_size.as_bytes() as usize, tls: opts.tls.clone(), }; - let builder = GrpcServerBuilder::new(grpc_config, common_runtime::bg_runtime()) + let builder = GrpcServerBuilder::new(grpc_config, common_runtime::global_runtime()) .with_tls_config(opts.tls.clone()) .context(error::InvalidTlsConfigSnafu)?; Ok(builder) @@ -208,7 +208,7 @@ where maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?; let mysql_server = MysqlServer::create_server( - common_runtime::bg_runtime(), + common_runtime::global_runtime(), Arc::new(MysqlSpawnRef::new( ServerSqlQueryHandlerAdapter::arc(instance.clone()), user_provider.clone(), @@ -237,7 +237,7 @@ where ServerSqlQueryHandlerAdapter::arc(instance.clone()), opts.tls.should_force_tls(), tls_server_config, - common_runtime::bg_runtime(), + common_runtime::global_runtime(), user_provider.clone(), )) as Box; diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index c9632e6ea341..d2a210fb4203 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -111,7 +111,7 @@ impl RaftEngineLogStore { fn start(&self) -> Result<()> { self.gc_task - .start(common_runtime::bg_runtime()) + .start(common_runtime::global_runtime()) .context(StartGcTaskSnafu) } @@ -279,7 +279,7 @@ impl LogStore for RaftEngineLogStore { ); let max_batch_size = self.config.read_batch_size; let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size); - let _handle = common_runtime::spawn_read(async move { + let _handle = common_runtime::spawn_global(async move { while start_index <= last_index { let mut vec = Vec::with_capacity(max_batch_size); match engine diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index bb9a9984f8b8..fef7e928a783 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -68,7 +68,7 @@ impl EtcdElection { let leader_ident = leader_value.clone(); let (tx, mut rx) = broadcast::channel(100); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { loop { match rx.recv().await { Ok(msg) => match msg { diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 07a32cc2947f..d74d49027d35 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -316,7 +316,7 @@ impl HeartbeatMailbox { let mailbox = Arc::new(Self::new(pushers, sequence)); let timeout_checker = mailbox.clone(); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { timeout_checker.check_timeout_bg(10).await; }); diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 4024a77af8ff..f8acdd75c255 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -31,7 +31,7 @@ impl RegionFailureHandler { heartbeat_acceptor: HeartbeatAcceptor, ) -> Self { info!("Starting region supervisor"); - common_runtime::spawn_bg(async move { region_supervisor.run().await }); + common_runtime::spawn_global(async move { region_supervisor.run().await }); Self { heartbeat_acceptor } } } diff --git a/src/meta-srv/src/lock.rs b/src/meta-srv/src/lock.rs index 5eceddac0485..53451591da4d 100644 --- a/src/meta-srv/src/lock.rs +++ b/src/meta-srv/src/lock.rs @@ -89,7 +89,7 @@ impl Drop for DistLockGuard<'_> { if let Some(key) = self.key.take() { let lock = self.lock.clone(); let name = self.name.clone(); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { if let Err(e) = lock.unlock(key).await { error!(e; "Failed to unlock '{}'", String::from_utf8_lossy(&name)); } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d92798a113b1..138490fb8c04 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -400,7 +400,7 @@ impl Metasrv { leader_cached_kv_backend: leader_cached_kv_backend.clone(), region_supervisor_ticker, }; - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { loop { match rx.recv().await { Ok(msg) => { @@ -436,7 +436,7 @@ impl Metasrv { let election = election.clone(); let started = self.started.clone(); let node_info = self.node_info(); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { while started.load(Ordering::Relaxed) { let res = election.register_candidate(&node_info).await; if let Err(e) = res { @@ -450,7 +450,7 @@ impl Metasrv { { let election = election.clone(); let started = self.started.clone(); - let _handle = common_runtime::spawn_write(async move { + let _handle = common_runtime::spawn_global(async move { while started.load(Ordering::Relaxed) { let res = election.campaign().await; if let Err(e) = res { diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 340ef375a691..22b25492e213 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -396,7 +396,7 @@ mod tests { .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { // retry: 0. let resp = rx.recv().await.unwrap().unwrap(); let reply_id = resp.mailbox_message.unwrap().id; @@ -445,7 +445,7 @@ mod tests { .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { for _ in 0..3 { let resp = rx.recv().await.unwrap().unwrap(); let reply_id = resp.mailbox_message.unwrap().id; diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index e9080e7fd5ce..73aa4371f467 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -347,7 +347,7 @@ impl RegionMigrationManager { let procedure_id = procedure_with_id.id; info!("Starting region migration procedure {procedure_id} for {task}"); let procedure_manager = self.procedure_manager.clone(); - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { let watcher = &mut match procedure_manager.submit(procedure_with_id).await { Ok(watcher) => watcher, Err(e) => { diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 6cc8ea12a507..1fb63b4a3417 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -291,7 +291,7 @@ pub fn send_mock_reply( mut rx: MockHeartbeatReceiver, msg: impl Fn(u64) -> Result + Send + 'static, ) { - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { while let Some(Ok(resp)) = rx.recv().await { let reply_id = resp.mailbox_message.unwrap().id; mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 5591427d0d2e..5a80cebb6534 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -438,7 +438,7 @@ mod tests { .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) .await; - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { let resp = rx.recv().await.unwrap().unwrap(); let reply_id = resp.mailbox_message.unwrap().id; mailbox @@ -497,7 +497,7 @@ mod tests { .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) .await; - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { let resp = rx.recv().await.unwrap().unwrap(); let reply_id = resp.mailbox_message.unwrap().id; mailbox diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 4f738e38f330..358644c51042 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -46,7 +46,7 @@ impl heartbeat_server::Heartbeat for Metasrv { let (tx, rx) = mpsc::channel(128); let handler_group = self.handler_group().clone(); let ctx = self.new_ctx(); - let _handle = common_runtime::spawn_write(async move { + let _handle = common_runtime::spawn_global(async move { let mut pusher_key = None; while let Some(msg) = in_stream.next().await { let mut is_not_leader = false; diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index cd49af7b1172..f5ba5aaf612d 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -334,7 +334,7 @@ impl Compactor for DefaultCompactor { Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION); for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION { if let Some(task) = futs.pop() { - task_chunk.push(common_runtime::spawn_bg(task)); + task_chunk.push(common_runtime::spawn_global(task)); } } let metas = futures::future::try_join_all(task_chunk) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index d92cd1044ded..d7f2ea034d75 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -337,7 +337,7 @@ impl EngineInner { // Waits for entries distribution. let distribution = - common_runtime::spawn_read(async move { distributor.distribute().await }); + common_runtime::spawn_global(async move { distributor.distribute().await }); // Waits for worker returns. let responses = join_all(responses).await; diff --git a/src/mito2/src/manifest/checkpointer.rs b/src/mito2/src/manifest/checkpointer.rs index c0e5b0d35ac9..c9ca65bfddd6 100644 --- a/src/mito2/src/manifest/checkpointer.rs +++ b/src/mito2/src/manifest/checkpointer.rs @@ -165,7 +165,7 @@ impl Checkpointer { self.inner.set_doing_checkpoint(); let inner = self.inner.clone(); - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { inner.do_checkpoint(checkpoint).await; }); } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index d336dcc2e674..dfe78795cae1 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -665,7 +665,7 @@ impl ScanInput { semaphore: Arc, sender: mpsc::Sender>, ) { - common_runtime::spawn_read(async move { + common_runtime::spawn_global(async move { loop { // We release the permit before sending result to avoid the task waiting on // the channel with the permit held. diff --git a/src/mito2/src/schedule/scheduler.rs b/src/mito2/src/schedule/scheduler.rs index 9c2d5c20ab06..1b2d34cb584c 100644 --- a/src/mito2/src/schedule/scheduler.rs +++ b/src/mito2/src/schedule/scheduler.rs @@ -71,7 +71,7 @@ impl LocalScheduler { let child = token.child_token(); let receiver = rx.clone(); let state_clone = state.clone(); - let handle = common_runtime::spawn_bg(async move { + let handle = common_runtime::spawn_global(async move { while state_clone.load(Ordering::Relaxed) == STATE_RUNNING { tokio::select! { _ = child.cancelled() => { diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index df49db75b6cc..b83101e3fb62 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -95,7 +95,7 @@ impl PuffinManagerFactory { let tempdir = common_test_util::temp_dir::create_temp_dir(prefix); let f = Self::new(tempdir.path().to_path_buf(), 1024, None); - let factory = common_runtime::block_on_bg(f).unwrap(); + let factory = common_runtime::block_on_global(f).unwrap(); (tempdir, factory) } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 35b65ad0d7bb..c8aa1bb340fb 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -438,7 +438,7 @@ impl WorkerStarter { flush_receiver: self.flush_receiver, stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]), }; - let handle = common_runtime::spawn_write(async move { + let handle = common_runtime::spawn_global(async move { worker_thread.run().await; }); @@ -830,7 +830,7 @@ impl RegionWorkerLoop { ) { if let Some(region) = self.regions.get_region(region_id) { // We need to do this in background as we need the manifest lock. - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { region.set_readonly_gracefully().await; let last_entry_id = region.version_control.current().last_entry_id; diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 06a439cc5ea8..ca1466249759 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -100,7 +100,7 @@ where let object_store = region.access_layer.object_store().clone(); let dropping_regions = self.dropping_regions.clone(); let listener = self.listener.clone(); - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { let gc_duration = listener .on_later_drop_begin(region_id) .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC)); diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 60ace00cd52b..e12f139b5b71 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -57,7 +57,7 @@ impl RegionWorkerLoop { let request_sender = self.sender.clone(); // Now the region is in editing state. // Updates manifest in background. - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { let result = edit_region(®ion, edit.clone()).await; let notify = WorkerRequest::Background { region_id, @@ -125,7 +125,7 @@ impl RegionWorkerLoop { let manifest_ctx = region.manifest_ctx.clone(); // Updates manifest in background. - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { // Write region truncated to manifest. let action_list = RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); @@ -167,7 +167,7 @@ impl RegionWorkerLoop { let request_sender = self.sender.clone(); // Now the region is in altering state. - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { let new_meta = change.metadata.clone(); let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change)); diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index d87f531a7220..7fe1d3c322a1 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -113,7 +113,7 @@ impl RegionWorkerLoop { let config = self.config.clone(); let opening_regions = self.opening_regions.clone(); opening_regions.insert_sender(region_id, sender); - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { match opener.open(&config, &wal).await { Ok(region) => { info!("Region {} is opened", region_id); diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 20c7b5381fa9..ac78350a5089 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -134,7 +134,7 @@ impl Deleter { .map(|(peer, deletes)| { let request = request_factory.build_delete(deletes); let node_manager = self.node_manager.clone(); - common_runtime::spawn_write(async move { + common_runtime::spawn_global(async move { node_manager .datanode(&peer) .await diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 38e79de6c993..4abda29c2eda 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -283,7 +283,7 @@ impl Inserter { let node_manager = self.node_manager.clone(); let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| { let node_manager = node_manager.clone(); - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { node_manager .flownode(&peer) .await @@ -320,7 +320,7 @@ impl Inserter { .map(|(peer, inserts)| { let node_manager = self.node_manager.clone(); let request = request_factory.build_insert(inserts); - common_runtime::spawn_write(async move { + common_runtime::spawn_global(async move { node_manager .datanode(&peer) .await diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index 9611bbfab1ff..64a6a75c31a7 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -171,7 +171,7 @@ impl Requester { let request = request_factory.build_request(req_body.clone()); let partition_manager = self.partition_manager.clone(); let node_manager = self.node_manager.clone(); - common_runtime::spawn_write(async move { + common_runtime::spawn_global(async move { let peer = Self::find_region_leader_by_request(partition_manager, &req_body).await?; node_manager diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index b59c72bad709..3d3d2c880407 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -184,7 +184,7 @@ where let reader = accessor.reader(&puffin_file_name).await?; let writer = writer_provider.writer(&file_meta.relative_path).await?; - let task = common_runtime::spawn_read(async move { + let task = common_runtime::spawn_global(async move { let mut file = PuffinFileReader::new(reader); let reader = file.blob_reader(&blob_meta)?; let compression = blob_meta.compression_codec; diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index 9294497a062a..c41df95c2521 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -89,7 +89,7 @@ impl BoundedStager { .build(); let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE); - common_runtime::bg_runtime().spawn(Self::delete_routine(rx)); + common_runtime::global_runtime().spawn(Self::delete_routine(rx)); let stager = Self { cache, diff --git a/src/script/src/python/utils.rs b/src/script/src/python/utils.rs index a82b10418d14..4662922f14dc 100644 --- a/src/script/src/python/utils.rs +++ b/src/script/src/python/utils.rs @@ -36,7 +36,7 @@ where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - common_runtime::spawn_blocking_bg(f) + common_runtime::spawn_blocking_global(f) } /// Please only use this method because you are calling from (optionally first as async) to sync then to a async @@ -50,7 +50,7 @@ where F: Future + Send + 'static, T: Send + 'static, { - let rt = common_runtime::bg_runtime(); + let rt = common_runtime::global_runtime(); // spawn a thread to block on the runtime, also should prevent `start a runtime inside of runtime` error // it's ok to block here, assume calling from async to sync is using a `spawn_blocking_*` call std::thread::spawn(move || rt.block_on(f)).join() diff --git a/src/servers/src/export_metrics.rs b/src/servers/src/export_metrics.rs index d71dbcaa88fa..36577d99a643 100644 --- a/src/servers/src/export_metrics.rs +++ b/src/servers/src/export_metrics.rs @@ -165,14 +165,14 @@ impl ExportMetricsTask { msg: "Only `frontend` or `standalone` can use `self_import` as export method." } ); - common_runtime::spawn_bg(write_system_metric_by_handler( + common_runtime::spawn_global(write_system_metric_by_handler( self_import.db.clone(), handler.unwrap(), filter, interval, )) } else if let Some(remote_write) = &self.config.remote_write { - common_runtime::spawn_bg(write_system_metric_by_network( + common_runtime::spawn_global(write_system_metric_by_network( self.headers.clone(), remote_write.url.clone(), filter, diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 1c6d856a5b33..d3198b46a83c 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -227,7 +227,7 @@ impl Server for GrpcServer { let mut serve_state = self.serve_state.lock().await; *serve_state = Some(serve_state_rx); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_global(async move { let result = builder .serve_with_incoming_shutdown(incoming, rx.map(drop)) .await diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index 038c3371b660..c49a79f29189 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -43,7 +43,7 @@ pub struct FlightRecordBatchStream { impl FlightRecordBatchStream { pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self { let (tx, rx) = mpsc::channel::>(1); - let join_handle = common_runtime::spawn_read(async move { + let join_handle = common_runtime::spawn_global(async move { Self::flight_data_stream(recordbatches, tx) .trace(tracing_context.attach(info_span!("flight_data_stream"))) .await diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index d4180f5478e7..b145af1cdc27 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -913,7 +913,7 @@ impl Server for HttpServer { let listening = server.local_addr(); info!("HTTP server is bound to {}", listening); - common_runtime::spawn_bg(async move { + common_runtime::spawn_global(async move { if let Err(e) = server .with_graceful_shutdown(rx.map(drop)) .await diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index dd875750927a..8dfb6d141544 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -218,7 +218,7 @@ impl Server for MysqlServer { let (stream, addr) = self.base_server.bind(listening).await?; let io_runtime = self.base_server.io_runtime(); - let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream)); + let join_handle = common_runtime::spawn_global(self.accept(io_runtime, stream)); self.base_server.start_with(join_handle).await?; Ok(addr) } diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index ff64732ca17e..cca9c43181cc 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -124,7 +124,7 @@ impl Server for PostgresServer { let (stream, addr) = self.base_server.bind(listening).await?; let io_runtime = self.base_server.io_runtime(); - let join_handle = common_runtime::spawn_read(self.accept(io_runtime, stream)); + let join_handle = common_runtime::spawn_global(self.accept(io_runtime, stream)); self.base_server.start_with(join_handle).await?; Ok(addr) diff --git a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs index d9dc4ba6fa0f..5eaf43ab0a8e 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs @@ -286,7 +286,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs index 3034ce5c095d..5fab5f260333 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs @@ -341,7 +341,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/fuzz_alter_logical_table.rs b/tests-fuzz/targets/fuzz_alter_logical_table.rs index 39e671ca6efb..3ceb5b8b4572 100644 --- a/tests-fuzz/targets/fuzz_alter_logical_table.rs +++ b/tests-fuzz/targets/fuzz_alter_logical_table.rs @@ -231,7 +231,7 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/fuzz_alter_table.rs b/tests-fuzz/targets/fuzz_alter_table.rs index 7fd8f7d7ac83..8c17612d0443 100644 --- a/tests-fuzz/targets/fuzz_alter_table.rs +++ b/tests-fuzz/targets/fuzz_alter_table.rs @@ -190,7 +190,7 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/fuzz_create_database.rs b/tests-fuzz/targets/fuzz_create_database.rs index 7fd3f1c3d9c8..3d052bb3d9d3 100644 --- a/tests-fuzz/targets/fuzz_create_database.rs +++ b/tests-fuzz/targets/fuzz_create_database.rs @@ -94,7 +94,7 @@ async fn execute_create_database(ctx: FuzzContext, input: FuzzInput) -> Result<( fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/fuzz_create_logical_table.rs b/tests-fuzz/targets/fuzz_create_logical_table.rs index c54b8f9ab7ca..39251732f2db 100644 --- a/tests-fuzz/targets/fuzz_create_logical_table.rs +++ b/tests-fuzz/targets/fuzz_create_logical_table.rs @@ -183,7 +183,7 @@ async fn execute_create_logic_table(ctx: FuzzContext, input: FuzzInput) -> Resul fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/fuzz_create_table.rs b/tests-fuzz/targets/fuzz_create_table.rs index 9411afa1a3a7..0d2458c15eb0 100644 --- a/tests-fuzz/targets/fuzz_create_table.rs +++ b/tests-fuzz/targets/fuzz_create_table.rs @@ -113,7 +113,7 @@ async fn execute_create_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/fuzz_insert.rs b/tests-fuzz/targets/fuzz_insert.rs index a712701b8533..45eec19e1bc3 100644 --- a/tests-fuzz/targets/fuzz_insert.rs +++ b/tests-fuzz/targets/fuzz_insert.rs @@ -209,7 +209,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/fuzz_insert_logical_table.rs b/tests-fuzz/targets/fuzz_insert_logical_table.rs index 22226dbb9de1..cde0a87fe2ca 100644 --- a/tests-fuzz/targets/fuzz_insert_logical_table.rs +++ b/tests-fuzz/targets/fuzz_insert_logical_table.rs @@ -287,7 +287,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs index bbc4b7297d96..271f04c143b4 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -306,7 +306,7 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result< fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let Connections { mysql } = init_greptime_connections_via_env().await; let ctx = FuzzContext { greptime: mysql.expect("mysql connection init must be succeed"), diff --git a/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs b/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs index 662ca0c67256..103501ac7a35 100644 --- a/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs +++ b/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs @@ -214,7 +214,7 @@ async fn validate_columns(client: &Pool, schema_name: &str, table_ctx: &T fuzz_target!(|input: FuzzInput| { common_telemetry::init_default_ut_logging(); - common_runtime::block_on_write(async { + common_runtime::block_on_global(async { let variables = load_unstable_test_env_variables(); let root_dir = variables.root_dir.unwrap_or(DEFAULT_ROOT_DIR.to_string()); create_dir_all(&root_dir).unwrap(); From d7c3d7647bf0b7fcb62f547ce78b562dd35eadd7 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 25 Jul 2024 21:33:41 +0800 Subject: [PATCH 3/8] spawn compact task into compact_runtime Signed-off-by: Ruihang Xia --- src/mito2/src/compaction/compactor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index f5ba5aaf612d..9c8c0e02bd1e 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -334,7 +334,7 @@ impl Compactor for DefaultCompactor { Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION); for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION { if let Some(task) = futs.pop() { - task_chunk.push(common_runtime::spawn_global(task)); + task_chunk.push(common_runtime::spawn_compact(task)); } } let metas = futures::future::try_join_all(task_chunk) From 4c39ddb8f46f7556d0952f2efc42b52972efeea9 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Jul 2024 11:13:24 +0800 Subject: [PATCH 4/8] refine naming Signed-off-by: Ruihang Xia --- src/common/runtime/src/global.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index a53ca1e893c9..b7d78badeb41 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -147,7 +147,7 @@ pub fn init_global_runtimes(options: &RuntimeOptions) { "compact-worker", options.compact_rt_size, )); - c.hb_runtime = Some(create_runtime("global-hb", "global-hb-worker", HB_WORKERS)); + c.hb_runtime = Some(create_runtime("hreartbeat", "hb-worker", HB_WORKERS)); }); } From c29bdfed0c4c4552f9cecb18af715d3e1925603c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Jul 2024 11:19:22 +0800 Subject: [PATCH 5/8] Update src/servers/tests/mysql/mysql_server_test.rs Co-authored-by: Zhenchi --- src/servers/tests/mysql/mysql_server_test.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index 9e65dc861e1e..35286a413ef6 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -46,8 +46,7 @@ struct MysqlOpts<'a> { } fn create_mysql_server(table: TableRef, opts: MysqlOpts<'_>) -> Result> { - let query_handler: Arc + Send + Sync> = - create_testing_sql_query_handler(table); +let query_handler = create_testing_sql_query_handler(table); let io_runtime = RuntimeBuilder::default() .worker_threads(4) .thread_name("mysql-io-handlers") From b9aafda1b1f5e7f9526fb513e6a118ff541db830 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 26 Jul 2024 11:18:29 +0800 Subject: [PATCH 6/8] fix clippy Signed-off-by: Ruihang Xia --- src/servers/tests/mysql/mysql_server_test.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index 35286a413ef6..e077409a62df 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -29,7 +29,6 @@ use rand::rngs::StdRng; use rand::Rng; use servers::error::Result; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; -use servers::query_handler::sql::SqlQueryHandler; use servers::server::Server; use servers::tls::{ReloadableTlsServerConfig, TlsOption}; use table::test_util::MemTable; @@ -46,7 +45,7 @@ struct MysqlOpts<'a> { } fn create_mysql_server(table: TableRef, opts: MysqlOpts<'_>) -> Result> { -let query_handler = create_testing_sql_query_handler(table); + let query_handler = create_testing_sql_query_handler(table); let io_runtime = RuntimeBuilder::default() .worker_threads(4) .thread_name("mysql-io-handlers") From 2bac2f4d2b8d5d6f7ba239d71adf6c1042dbf265 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 29 Jul 2024 15:27:39 +0800 Subject: [PATCH 7/8] turnoff fuzz test matrix fail fast option Signed-off-by: Ruihang Xia --- .github/workflows/develop.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 875b59c768ce..4a3982b585fd 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -141,6 +141,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 60 strategy: + fail-fast: false matrix: target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ] steps: From 30048aa04d40158cb074ba24799f35d9f6f375e0 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Jul 2024 17:02:46 +0000 Subject: [PATCH 8/8] chore: update rt config for ci tests --- .../setup-greptimedb-cluster/with-disk.yaml | 13 ++++--------- .../with-minio-and-cache.yaml | 15 +++++---------- .../setup-greptimedb-cluster/with-minio.yaml | 13 ++++--------- .../setup-greptimedb-cluster/with-remote-wal.yaml | 13 ++++--------- 4 files changed, 17 insertions(+), 37 deletions(-) diff --git a/.github/actions/setup-greptimedb-cluster/with-disk.yaml b/.github/actions/setup-greptimedb-cluster/with-disk.yaml index 2b5b85547651..1cbd22dbba6f 100644 --- a/.github/actions/setup-greptimedb-cluster/with-disk.yaml +++ b/.github/actions/setup-greptimedb-cluster/with-disk.yaml @@ -1,18 +1,13 @@ meta: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 datanode: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 + compact_rt_size = 2 frontend: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 \ No newline at end of file diff --git a/.github/actions/setup-greptimedb-cluster/with-minio-and-cache.yaml b/.github/actions/setup-greptimedb-cluster/with-minio-and-cache.yaml index acf99adf26d2..fc89bd542253 100644 --- a/.github/actions/setup-greptimedb-cluster/with-minio-and-cache.yaml +++ b/.github/actions/setup-greptimedb-cluster/with-minio-and-cache.yaml @@ -1,19 +1,16 @@ meta: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 - + global_rt_size = 4 + [datanode] [datanode.client] timeout = "60s" datanode: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 + compact_rt_size = 2 [storage] cache_path = "/data/greptimedb/s3cache" @@ -21,9 +18,7 @@ datanode: frontend: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 [meta_client] ddl_timeout = "60s" diff --git a/.github/actions/setup-greptimedb-cluster/with-minio.yaml b/.github/actions/setup-greptimedb-cluster/with-minio.yaml index d5ddcddba56a..b0b1c6b757e4 100644 --- a/.github/actions/setup-greptimedb-cluster/with-minio.yaml +++ b/.github/actions/setup-greptimedb-cluster/with-minio.yaml @@ -1,9 +1,7 @@ meta: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 [datanode] [datanode.client] @@ -11,15 +9,12 @@ meta: datanode: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 + compact_rt_size = 2 frontend: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 [meta_client] ddl_timeout = "60s" diff --git a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml index bf4d3da65c7b..e5fc71cfe382 100644 --- a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml +++ b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml @@ -1,9 +1,7 @@ meta: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 [wal] provider = "kafka" @@ -17,9 +15,8 @@ meta: datanode: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 + compact_rt_size = 2 [wal] provider = "kafka" @@ -28,9 +25,7 @@ datanode: frontend: config: |- [runtime] - read_rt_size = 8 - write_rt_size = 8 - bg_rt_size = 8 + global_rt_size = 4 [meta_client] ddl_timeout = "60s"