Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove dedicated runtime for grpc, mysql and pg protocols #4436

Merged
merged 13 commits into from
Jul 30, 2024
1 change: 1 addition & 0 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ]
steps:
Expand Down
20 changes: 8 additions & 12 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `default_timezone` | String | `None` | The default timezone of the server. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
Expand Down Expand Up @@ -170,9 +169,8 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | `None` | The default timezone of the server. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
Expand Down Expand Up @@ -262,9 +260,8 @@
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
Expand Down Expand Up @@ -338,9 +335,8 @@
| `grpc.tls.key_path` | String | `None` | Private key file path. |
| `grpc.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload.<br/>For now, gRPC tls config does not support auto reload. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
Expand Down
6 changes: 2 additions & 4 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,9 @@ watch = false
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
read_rt_size = 8
global_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 4
compact_rt_size = 4

## The heartbeat options.
[heartbeat]
Expand Down
6 changes: 2 additions & 4 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ default_timezone = "UTC"
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
read_rt_size = 8
global_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 4
compact_rt_size = 4

## The heartbeat options.
[heartbeat]
Expand Down
6 changes: 2 additions & 4 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ enable_region_failover = false
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
read_rt_size = 8
global_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 4
compact_rt_size = 4

## Procedure storage options.
[procedure]
Expand Down
6 changes: 2 additions & 4 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ default_timezone = "UTC"
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
read_rt_size = 8
global_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 4
compact_rt_size = 4

## The HTTP server options.
[http]
Expand Down
2 changes: 1 addition & 1 deletion docs/how-to/how-to-write-fuzz-tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ use tests_fuzz::utils::{init_greptime_connections, Connections};

fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
common_runtime::block_on_global(async {
let Connections { mysql } = init_greptime_connections().await;
let mut rng = ChaChaRng::seed_from_u64(input.seed);
let columns = rng.gen_range(2..30);
Expand Down
20 changes: 8 additions & 12 deletions src/cmd/tests/load_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ fn test_load_datanode_example_config() {

let expected = GreptimeOptions::<DatanodeOptions> {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 4,
global_rt_size: 8,
compact_rt_size: 4,
},
component: DatanodeOptions {
node_id: Some(42),
Expand Down Expand Up @@ -119,9 +118,8 @@ fn test_load_frontend_example_config() {
.unwrap();
let expected = GreptimeOptions::<FrontendOptions> {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 4,
global_rt_size: 8,
compact_rt_size: 4,
},
component: FrontendOptions {
default_timezone: Some("UTC".to_string()),
Expand Down Expand Up @@ -167,9 +165,8 @@ fn test_load_metasrv_example_config() {
.unwrap();
let expected = GreptimeOptions::<MetasrvOptions> {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 4,
global_rt_size: 8,
compact_rt_size: 4,
},
component: MetasrvOptions {
selector: SelectorType::LeaseBased,
Expand Down Expand Up @@ -200,9 +197,8 @@ fn test_load_standalone_example_config() {
.unwrap();
let expected = GreptimeOptions::<StandaloneOptions> {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 4,
global_rt_size: 8,
compact_rt_size: 4,
},
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl FileFormat for CsvFormat {
let schema_infer_max_record = self.schema_infer_max_record;
let has_header = self.has_header;

common_runtime::spawn_blocking_read(move || {
common_runtime::spawn_blocking_global(move || {
let reader = SyncIoBridge::new(decoded);

let (schema, _records_read) =
Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/src/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl FileFormat for JsonFormat {

let schema_infer_max_record = self.schema_infer_max_record;

common_runtime::spawn_blocking_read(move || {
common_runtime::spawn_blocking_global(move || {
let mut reader = BufReader::new(SyncIoBridge::new(decoded));

let iter = ValueIter::new(&mut reader, schema_infer_max_record);
Expand Down
2 changes: 1 addition & 1 deletion src/common/greptimedb-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl GreptimeDBTelemetryTask {
match self {
GreptimeDBTelemetryTask::Enable((task, _)) => {
print_anonymous_usage_data_disclaimer();
task.start(common_runtime::bg_runtime())
task.start(common_runtime::global_runtime())
}
GreptimeDBTelemetryTask::Disable => Ok(()),
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/grpc/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl ChannelManager {
}

let pool = self.pool.clone();
let _handle = common_runtime::spawn_bg(async {
let _handle = common_runtime::spawn_global(async {
recycle_channel_in_loop(pool, RECYCLE_CHANNEL_INTERVAL_SECS).await;
});
info!(
Expand Down
4 changes: 2 additions & 2 deletions src/common/macro/src/admin_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ fn build_struct(
.create_mutable_vector(rows_num);

if columns_num == 0 {
let result = common_runtime::block_on_read(async move {
waynexia marked this conversation as resolved.
Show resolved Hide resolved
let result = common_runtime::block_on_global(async move {
#fn_name(handler, query_ctx, &[]).await
})?;

Expand All @@ -239,7 +239,7 @@ fn build_struct(
.map(|vector| vector.get_ref(i))
.collect();

let result = common_runtime::block_on_read(async move {
let result = common_runtime::block_on_global(async move {
#fn_name(handler, query_ctx, &args).await
})?;

Expand Down
4 changes: 2 additions & 2 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ impl LocalManager {

let tracing_context = TracingContext::from_current_span();

let _handle = common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_global(async move {
// Run the root procedure.
// The task was moved to another runtime for execution.
// In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
Expand Down Expand Up @@ -593,7 +593,7 @@ impl ProcedureManager for LocalManager {
let task_inner = self.build_remove_outdated_meta_task();

task_inner
.start(common_runtime::bg_runtime())
.start(common_runtime::global_runtime())
.context(StartRemoveOutdatedMetaTaskSnafu)?;

*task = Some(task_inner);
Expand Down
2 changes: 1 addition & 1 deletion src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl Runner {
// Add the id of the subprocedure to the metadata.
self.meta.push_child(procedure_id);

let _handle = common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_global(async move {
// Run the root procedure.
runner.run().await
});
Expand Down
Loading
Loading