From 1410e14ed9e099f47bf25724950a213282e72ac2 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 5 Dec 2024 06:46:21 +0000 Subject: [PATCH] refactor: relocate CLI to a dedicated directory --- Cargo.lock | 57 ++++ Cargo.toml | 2 + src/catalog/src/information_extension.rs | 92 +++++ src/catalog/src/lib.rs | 1 + src/cli/Cargo.toml | 65 ++++ src/{cmd/src/cli => cli/src}/bench.rs | 7 +- .../src/cli => cli/src}/bench/metadata.rs | 2 +- src/{cmd/src/cli => cli/src}/cmd.rs | 0 src/{cmd/src/cli => cli/src}/database.rs | 0 src/cli/src/error.rs | 316 ++++++++++++++++++ src/{cmd/src/cli => cli/src}/export.rs | 109 +----- src/{cmd/src/cli => cli/src}/helper.rs | 2 +- src/{cmd/src/cli => cli/src}/import.rs | 26 +- src/cli/src/lib.rs | 58 ++++ src/{cmd/src/cli => cli/src}/repl.rs | 8 +- src/cmd/Cargo.toml | 1 + src/cmd/src/cli.rs | 155 +++++---- src/cmd/src/error.rs | 16 + src/cmd/src/flownode.rs | 3 +- src/cmd/src/frontend.rs | 3 +- src/cmd/src/lib.rs | 76 ----- src/plugins/Cargo.toml | 2 + src/plugins/src/cli.rs | 36 ++ src/plugins/src/lib.rs | 2 + tests-integration/src/cluster.rs | 2 +- 25 files changed, 779 insertions(+), 262 deletions(-) create mode 100644 src/catalog/src/information_extension.rs create mode 100644 src/cli/Cargo.toml rename src/{cmd/src/cli => cli/src}/bench.rs (95%) rename src/{cmd/src/cli => cli/src}/bench/metadata.rs (99%) rename src/{cmd/src/cli => cli/src}/cmd.rs (100%) rename src/{cmd/src/cli => cli/src}/database.rs (100%) create mode 100644 src/cli/src/error.rs rename src/{cmd/src/cli => cli/src}/export.rs (83%) rename src/{cmd/src/cli => cli/src}/helper.rs (99%) rename src/{cmd/src/cli => cli/src}/import.rs (92%) create mode 100644 src/cli/src/lib.rs rename src/{cmd/src/cli => cli/src}/repl.rs (98%) create mode 100644 src/plugins/src/cli.rs diff --git a/Cargo.lock b/Cargo.lock index 8ec39f71f7c3..b41580c74bdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1712,6 +1712,60 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +[[package]] +name = "cli" +version = "0.11.0" +dependencies = [ + "async-trait", + "auth", + "base64 0.21.7", + "cache", + "catalog", + "chrono", + "clap 4.5.19", + "client", + "common-base", + "common-catalog", + "common-config", + "common-error", + "common-grpc", + "common-macro", + "common-meta", + "common-options", + "common-procedure", + "common-query", + "common-recordbatch", + "common-runtime", + "common-telemetry", + "common-test-util", + "common-time", + "common-version", + "common-wal", + "datatypes", + "either", + "etcd-client", + "futures", + "humantime", + "meta-client", + "nu-ansi-term", + "query", + "rand", + "reqwest", + "rustyline 10.1.1", + "serde", + "serde_json", + "servers", + "session", + "snafu 0.8.5", + "store-api", + "substrait 0.11.0", + "table", + "temp-env", + "tempfile", + "tokio", + "tracing-appender", +] + [[package]] name = "client" version = "0.11.0" @@ -1793,6 +1847,7 @@ dependencies = [ "catalog", "chrono", "clap 4.5.19", + "cli", "client", "common-base", "common-catalog", @@ -8348,6 +8403,8 @@ name = "plugins" version = "0.11.0" dependencies = [ "auth", + "clap 4.5.19", + "cli", "common-base", "datanode", "frontend", diff --git a/Cargo.toml b/Cargo.toml index 73db80c4c858..4cc07cd89818 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "src/auth", "src/cache", "src/catalog", + "src/cli", "src/client", "src/cmd", "src/common/base", @@ -200,6 +201,7 @@ api = { path = "src/api" } auth = { path = "src/auth" } cache = { path = "src/cache" } catalog = { path = "src/catalog" } +cli = { path = "src/cli" } client = { path = "src/client" } cmd = { path = "src/cmd", default-features = false } common-base = { path = "src/common/base" } diff --git a/src/catalog/src/information_extension.rs b/src/catalog/src/information_extension.rs new file mode 100644 index 000000000000..55764557a326 --- /dev/null +++ b/src/catalog/src/information_extension.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::ProcedureStatus; +use common_error::ext::BoxedError; +use common_meta::cluster::{ClusterInfo, NodeInfo}; +use common_meta::datanode::RegionStat; +use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; +use common_meta::rpc::procedure; +use common_procedure::{ProcedureInfo, ProcedureState}; +use meta_client::MetaClientRef; +use snafu::ResultExt; + +use crate::error; +use crate::information_schema::InformationExtension; + +pub struct DistributedInformationExtension { + meta_client: MetaClientRef, +} + +impl DistributedInformationExtension { + pub fn new(meta_client: MetaClientRef) -> Self { + Self { meta_client } + } +} + +#[async_trait::async_trait] +impl InformationExtension for DistributedInformationExtension { + type Error = crate::error::Error; + + async fn nodes(&self) -> std::result::Result, Self::Error> { + self.meta_client + .list_nodes(None) + .await + .map_err(BoxedError::new) + .context(error::ListNodesSnafu) + } + + async fn procedures(&self) -> std::result::Result, Self::Error> { + let procedures = self + .meta_client + .list_procedures(&ExecutorContext::default()) + .await + .map_err(BoxedError::new) + .context(error::ListProceduresSnafu)? + .procedures; + let mut result = Vec::with_capacity(procedures.len()); + for procedure in procedures { + let pid = match procedure.id { + Some(pid) => pid, + None => return error::ProcedureIdNotFoundSnafu {}.fail(), + }; + let pid = procedure::pb_pid_to_pid(&pid) + .map_err(BoxedError::new) + .context(error::ConvertProtoDataSnafu)?; + let status = ProcedureStatus::try_from(procedure.status) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown") + .to_string(); + let procedure_info = ProcedureInfo { + id: pid, + type_name: procedure.type_name, + start_time_ms: procedure.start_time_ms, + end_time_ms: procedure.end_time_ms, + state: ProcedureState::Running, + lock_keys: procedure.lock_keys, + }; + result.push((status, procedure_info)); + } + + Ok(result) + } + + async fn region_stats(&self) -> std::result::Result, Self::Error> { + self.meta_client + .list_region_stats() + .await + .map_err(BoxedError::new) + .context(error::ListRegionStatsSnafu) + } +} diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 3444c0e089e6..623f2a363e6d 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -30,6 +30,7 @@ use table::TableRef; use crate::error::Result; pub mod error; +pub mod information_extension; pub mod kvbackend; pub mod memory; mod metrics; diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml new file mode 100644 index 000000000000..b49aa00ee2cc --- /dev/null +++ b/src/cli/Cargo.toml @@ -0,0 +1,65 @@ +[package] +name = "cli" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +async-trait.workspace = true +auth.workspace = true +base64.workspace = true +cache.workspace = true +catalog.workspace = true +chrono.workspace = true +clap.workspace = true +client.workspace = true +common-base.workspace = true +common-catalog.workspace = true +common-config.workspace = true +common-error.workspace = true +common-grpc.workspace = true +common-macro.workspace = true +common-meta.workspace = true +common-options.workspace = true +common-procedure.workspace = true +common-query.workspace = true +common-recordbatch.workspace = true +common-runtime.workspace = true +common-telemetry = { workspace = true, features = [ + "deadlock_detection", +] } +common-time.workspace = true +common-version.workspace = true +common-wal.workspace = true +datatypes.workspace = true +either = "1.8" +etcd-client.workspace = true +futures.workspace = true +humantime.workspace = true +meta-client.workspace = true +nu-ansi-term = "0.46" +query.workspace = true +rand.workspace = true +reqwest.workspace = true +rustyline = "10.1" +serde.workspace = true +serde_json.workspace = true +servers.workspace = true +session.workspace = true +snafu.workspace = true +store-api.workspace = true +substrait.workspace = true +table.workspace = true +tokio.workspace = true +tracing-appender.workspace = true + +[dev-dependencies] +client = { workspace = true, features = ["testing"] } +common-test-util.workspace = true +common-version.workspace = true +serde.workspace = true +temp-env = "0.3" +tempfile.workspace = true diff --git a/src/cmd/src/cli/bench.rs b/src/cli/src/bench.rs similarity index 95% rename from src/cmd/src/cli/bench.rs rename to src/cli/src/bench.rs index f3d1d0f8097f..88d0df9d50d8 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cli/src/bench.rs @@ -30,11 +30,10 @@ use rand::Rng; use store_api::storage::RegionNumber; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; use table::table_name::TableName; -use tracing_appender::non_blocking::WorkerGuard; use self::metadata::TableMetadataBencher; -use crate::cli::{Instance, Tool}; use crate::error::Result; +use crate::Tool; mod metadata; @@ -62,7 +61,7 @@ pub struct BenchTableMetadataCommand { } impl BenchTableMetadataCommand { - pub async fn build(&self, guard: Vec) -> Result { + pub async fn build(&self) -> Result> { let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128) .await .unwrap(); @@ -73,7 +72,7 @@ impl BenchTableMetadataCommand { table_metadata_manager, count: self.count, }; - Ok(Instance::new(Box::new(tool), guard)) + Ok(Box::new(tool)) } } diff --git a/src/cmd/src/cli/bench/metadata.rs b/src/cli/src/bench/metadata.rs similarity index 99% rename from src/cmd/src/cli/bench/metadata.rs rename to src/cli/src/bench/metadata.rs index 9229b0342e88..28343232a1db 100644 --- a/src/cmd/src/cli/bench/metadata.rs +++ b/src/cli/src/bench/metadata.rs @@ -18,7 +18,7 @@ use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; use table::table_name::TableName; -use crate::cli::bench::{ +use crate::bench::{ bench_self_recorded, create_region_routes, create_region_wal_options, create_table_info, }; diff --git a/src/cmd/src/cli/cmd.rs b/src/cli/src/cmd.rs similarity index 100% rename from src/cmd/src/cli/cmd.rs rename to src/cli/src/cmd.rs diff --git a/src/cmd/src/cli/database.rs b/src/cli/src/database.rs similarity index 100% rename from src/cmd/src/cli/database.rs rename to src/cli/src/database.rs diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs new file mode 100644 index 000000000000..bf0b6342c1f9 --- /dev/null +++ b/src/cli/src/error.rs @@ -0,0 +1,316 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use rustyline::error::ReadlineError; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to install ring crypto provider: {}", msg))] + InitTlsProvider { + #[snafu(implicit)] + location: Location, + msg: String, + }, + #[snafu(display("Failed to create default catalog and schema"))] + InitMetadata { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to init DDL manager"))] + InitDdlManager { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to init default timezone"))] + InitTimezone { + #[snafu(implicit)] + location: Location, + source: common_time::error::Error, + }, + + #[snafu(display("Failed to start procedure manager"))] + StartProcedureManager { + #[snafu(implicit)] + location: Location, + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to stop procedure manager"))] + StopProcedureManager { + #[snafu(implicit)] + location: Location, + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to start wal options allocator"))] + StartWalOptionsAllocator { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Missing config, msg: {}", msg))] + MissingConfig { + msg: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Illegal config: {}", msg))] + IllegalConfig { + msg: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid REPL command: {reason}"))] + InvalidReplCommand { reason: String }, + + #[snafu(display("Cannot create REPL"))] + ReplCreation { + #[snafu(source)] + error: ReadlineError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Error reading command"))] + Readline { + #[snafu(source)] + error: ReadlineError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to request database, sql: {sql}"))] + RequestDatabase { + sql: String, + #[snafu(source)] + source: client::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to collect RecordBatches"))] + CollectRecordBatches { + #[snafu(implicit)] + location: Location, + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to pretty print Recordbatches"))] + PrettyPrintRecordBatches { + #[snafu(implicit)] + location: Location, + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to start Meta client"))] + StartMetaClient { + #[snafu(implicit)] + location: Location, + source: meta_client::error::Error, + }, + + #[snafu(display("Failed to parse SQL: {}", sql))] + ParseSql { + sql: String, + #[snafu(implicit)] + location: Location, + source: query::error::Error, + }, + + #[snafu(display("Failed to plan statement"))] + PlanStatement { + #[snafu(implicit)] + location: Location, + source: query::error::Error, + }, + + #[snafu(display("Failed to encode logical plan in substrait"))] + SubstraitEncodeLogicalPlan { + #[snafu(implicit)] + location: Location, + source: substrait::error::Error, + }, + + #[snafu(display("Failed to load layered config"))] + LoadLayeredConfig { + #[snafu(source(from(common_config::error::Error, Box::new)))] + source: Box, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to connect to Etcd at {etcd_addr}"))] + ConnectEtcd { + etcd_addr: String, + #[snafu(source)] + error: etcd_client::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to serde json"))] + SerdeJson { + #[snafu(source)] + error: serde_json::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to run http request: {reason}"))] + HttpQuerySql { + reason: String, + #[snafu(source)] + error: reqwest::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Empty result from output"))] + EmptyResult { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to manipulate file"))] + FileIo { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: std::io::Error, + }, + + #[snafu(display("Failed to create directory {}", dir))] + CreateDir { + dir: String, + #[snafu(source)] + error: std::io::Error, + }, + + #[snafu(display("Failed to spawn thread"))] + SpawnThread { + #[snafu(source)] + error: std::io::Error, + }, + + #[snafu(display("Other error"))] + Other { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to build runtime"))] + BuildRuntime { + #[snafu(implicit)] + location: Location, + source: common_runtime::error::Error, + }, + + #[snafu(display("Failed to get cache from cache registry: {}", name))] + CacheRequired { + #[snafu(implicit)] + location: Location, + name: String, + }, + + #[snafu(display("Failed to build cache registry"))] + BuildCacheRegistry { + #[snafu(implicit)] + location: Location, + source: cache::error::Error, + }, + + #[snafu(display("Failed to initialize meta client"))] + MetaClientInit { + #[snafu(implicit)] + location: Location, + source: meta_client::error::Error, + }, + + #[snafu(display("Cannot find schema {schema} in catalog {catalog}"))] + SchemaNotFound { + catalog: String, + schema: String, + #[snafu(implicit)] + location: Location, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { + source.status_code() + } + + Error::MissingConfig { .. } + | Error::LoadLayeredConfig { .. } + | Error::IllegalConfig { .. } + | Error::InvalidReplCommand { .. } + | Error::InitTimezone { .. } + | Error::ConnectEtcd { .. } + | Error::CreateDir { .. } + | Error::EmptyResult { .. } => StatusCode::InvalidArguments, + + Error::StartProcedureManager { source, .. } + | Error::StopProcedureManager { source, .. } => source.status_code(), + Error::StartWalOptionsAllocator { source, .. } => source.status_code(), + Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => { + StatusCode::Internal + } + Error::RequestDatabase { source, .. } => source.status_code(), + Error::CollectRecordBatches { source, .. } + | Error::PrettyPrintRecordBatches { source, .. } => source.status_code(), + Error::StartMetaClient { source, .. } => source.status_code(), + Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => { + source.status_code() + } + Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(), + + Error::SerdeJson { .. } + | Error::FileIo { .. } + | Error::SpawnThread { .. } + | Error::InitTlsProvider { .. } => StatusCode::Unexpected, + + Error::Other { source, .. } => source.status_code(), + + Error::BuildRuntime { source, .. } => source.status_code(), + + Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal, + Error::MetaClientInit { source, .. } => source.status_code(), + Error::SchemaNotFound { .. } => StatusCode::DatabaseNotFound, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/cmd/src/cli/export.rs b/src/cli/src/export.rs similarity index 83% rename from src/cmd/src/cli/export.rs rename to src/cli/src/export.rs index 6d6cb6756b82..990265013bbd 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cli/src/export.rs @@ -26,11 +26,10 @@ use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Semaphore; use tokio::time::Instant; -use tracing_appender::non_blocking::WorkerGuard; -use crate::cli::database::DatabaseClient; -use crate::cli::{database, Instance, Tool}; +use crate::database::DatabaseClient; use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu}; +use crate::{database, Tool}; type TableReference = (String, String, String); @@ -94,7 +93,7 @@ pub struct ExportCommand { } impl ExportCommand { - pub async fn build(&self, guard: Vec) -> Result { + pub async fn build(&self) -> Result> { let (catalog, schema) = database::split_database(&self.database)?; let database_client = DatabaseClient::new( @@ -105,19 +104,16 @@ impl ExportCommand { self.timeout.unwrap_or_default(), ); - Ok(Instance::new( - Box::new(Export { - catalog, - schema, - database_client, - output_dir: self.output_dir.clone(), - parallelism: self.export_jobs, - target: self.target.clone(), - start_time: self.start_time.clone(), - end_time: self.end_time.clone(), - }), - guard, - )) + Ok(Box::new(Export { + catalog, + schema, + database_client, + output_dir: self.output_dir.clone(), + parallelism: self.export_jobs, + target: self.target.clone(), + start_time: self.start_time.clone(), + end_time: self.end_time.clone(), + })) } } @@ -480,82 +476,3 @@ impl Tool for Export { } } } - -#[cfg(test)] -mod tests { - use clap::Parser; - use client::{Client, Database}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_telemetry::logging::LoggingOptions; - - use crate::error::Result as CmdResult; - use crate::options::GlobalOptions; - use crate::{cli, standalone, App}; - - #[tokio::test(flavor = "multi_thread")] - async fn test_export_create_table_with_quoted_names() -> CmdResult<()> { - let output_dir = tempfile::tempdir().unwrap(); - - let standalone = standalone::Command::parse_from([ - "standalone", - "start", - "--data-home", - &*output_dir.path().to_string_lossy(), - ]); - - let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap(); - let mut instance = standalone.build(standalone_opts).await?; - instance.start().await?; - - let client = Client::with_urls(["127.0.0.1:4001"]); - let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - database - .sql(r#"CREATE DATABASE "cli.export.create_table";"#) - .await - .unwrap(); - database - .sql( - r#"CREATE TABLE "cli.export.create_table"."a.b.c"( - ts TIMESTAMP, - TIME INDEX (ts) - ) engine=mito; - "#, - ) - .await - .unwrap(); - - let output_dir = tempfile::tempdir().unwrap(); - let cli = cli::Command::parse_from([ - "cli", - "export", - "--addr", - "127.0.0.1:4000", - "--output-dir", - &*output_dir.path().to_string_lossy(), - "--target", - "schema", - ]); - let mut cli_app = cli.build(LoggingOptions::default()).await?; - cli_app.start().await?; - - instance.stop().await?; - - let output_file = output_dir - .path() - .join("greptime") - .join("cli.export.create_table") - .join("create_tables.sql"); - let res = std::fs::read_to_string(output_file).unwrap(); - let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( - "ts" TIMESTAMP(3) NOT NULL, - TIME INDEX ("ts") -) - -ENGINE=mito -; -"#; - assert_eq!(res.trim(), expect.trim()); - - Ok(()) - } -} diff --git a/src/cmd/src/cli/helper.rs b/src/cli/src/helper.rs similarity index 99% rename from src/cmd/src/cli/helper.rs rename to src/cli/src/helper.rs index 08b12595149e..ee47e0f577b1 100644 --- a/src/cmd/src/cli/helper.rs +++ b/src/cli/src/helper.rs @@ -19,7 +19,7 @@ use rustyline::highlight::{Highlighter, MatchingBracketHighlighter}; use rustyline::hint::{Hinter, HistoryHinter}; use rustyline::validate::{ValidationContext, ValidationResult, Validator}; -use crate::cli::cmd::ReplCommand; +use crate::cmd::ReplCommand; pub(crate) struct RustylineHelper { hinter: HistoryHinter, diff --git a/src/cmd/src/cli/import.rs b/src/cli/src/import.rs similarity index 92% rename from src/cmd/src/cli/import.rs rename to src/cli/src/import.rs index 9cb7b60f59e7..ad42a3be4b5f 100644 --- a/src/cmd/src/cli/import.rs +++ b/src/cli/src/import.rs @@ -23,11 +23,10 @@ use common_telemetry::{error, info, warn}; use snafu::{OptionExt, ResultExt}; use tokio::sync::Semaphore; use tokio::time::Instant; -use tracing_appender::non_blocking::WorkerGuard; -use crate::cli::database::DatabaseClient; -use crate::cli::{database, Instance, Tool}; +use crate::database::DatabaseClient; use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu}; +use crate::{database, Tool}; #[derive(Debug, Default, Clone, ValueEnum)] enum ImportTarget { @@ -79,7 +78,7 @@ pub struct ImportCommand { } impl ImportCommand { - pub async fn build(&self, guard: Vec) -> Result { + pub async fn build(&self) -> Result> { let (catalog, schema) = database::split_database(&self.database)?; let database_client = DatabaseClient::new( self.addr.clone(), @@ -89,17 +88,14 @@ impl ImportCommand { self.timeout.unwrap_or_default(), ); - Ok(Instance::new( - Box::new(Import { - catalog, - schema, - database_client, - input_dir: self.input_dir.clone(), - parallelism: self.import_jobs, - target: self.target.clone(), - }), - guard, - )) + Ok(Box::new(Import { + catalog, + schema, + database_client, + input_dir: self.input_dir.clone(), + parallelism: self.import_jobs, + target: self.target.clone(), + })) } } diff --git a/src/cli/src/lib.rs b/src/cli/src/lib.rs new file mode 100644 index 000000000000..a20d777db3bd --- /dev/null +++ b/src/cli/src/lib.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod bench; +pub mod error; +// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 +#[allow(unused)] +mod cmd; +mod export; +mod helper; + +// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 +mod database; +mod import; +#[allow(unused)] +mod repl; + +use async_trait::async_trait; +use clap::Parser; +use error::Result; +pub use repl::Repl; + +pub use crate::bench::BenchTableMetadataCommand; +pub use crate::export::ExportCommand; +pub use crate::import::ImportCommand; + +#[async_trait] +pub trait Tool: Send + Sync { + async fn do_work(&self) -> Result<()>; +} + +#[derive(Debug, Parser)] +pub(crate) struct AttachCommand { + #[clap(long)] + pub(crate) grpc_addr: String, + #[clap(long)] + pub(crate) meta_addr: Option, + #[clap(long, action)] + pub(crate) disable_helper: bool, +} + +impl AttachCommand { + #[allow(dead_code)] + async fn build(self) -> Result> { + unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373") + } +} diff --git a/src/cmd/src/cli/repl.rs b/src/cli/src/repl.rs similarity index 98% rename from src/cmd/src/cli/repl.rs rename to src/cli/src/repl.rs index 8c6e154a26d6..4c2ef8ffe396 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cli/src/repl.rs @@ -20,6 +20,7 @@ use cache::{ build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, TABLE_ROUTE_CACHE_NAME, }; +use catalog::information_extension::DistributedInformationExtension; use catalog::kvbackend::{ CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend, }; @@ -44,15 +45,14 @@ use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; -use crate::cli::cmd::ReplCommand; -use crate::cli::helper::RustylineHelper; -use crate::cli::AttachCommand; +use crate::cmd::ReplCommand; use crate::error::{ CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu, ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu, SubstraitEncodeLogicalPlanSnafu, }; -use crate::{error, DistributedInformationExtension}; +use crate::helper::RustylineHelper; +use crate::{error, AttachCommand}; /// Captures the state of the repl, gathers commands and executes them one by one pub struct Repl { diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index c1f20cc9c526..3b498c829215 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -25,6 +25,7 @@ cache.workspace = true catalog.workspace = true chrono.workspace = true clap.workspace = true +cli.workspace = true client.workspace = true common-base.workspace = true common-catalog.workspace = true diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index fc43e0997665..55ebe64bc262 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -12,39 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod bench; - -// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 -#[allow(unused)] -mod cmd; -mod export; -mod helper; - -// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 -mod database; -mod import; -#[allow(unused)] -mod repl; - -use async_trait::async_trait; -use bench::BenchTableMetadataCommand; use clap::Parser; +use cli::Tool; use common_telemetry::logging::{LoggingOptions, TracingOptions}; -pub use repl::Repl; +use plugins::SubCommand; +use snafu::ResultExt; use tracing_appender::non_blocking::WorkerGuard; -use self::export::ExportCommand; -use crate::cli::import::ImportCommand; -use crate::error::Result; use crate::options::GlobalOptions; -use crate::App; - +use crate::{error, App, Result}; pub const APP_NAME: &str = "greptime-cli"; - -#[async_trait] -pub trait Tool: Send + Sync { - async fn do_work(&self) -> Result<()>; -} +use async_trait::async_trait; pub struct Instance { tool: Box, @@ -54,12 +32,16 @@ pub struct Instance { } impl Instance { - fn new(tool: Box, guard: Vec) -> Self { + pub fn new(tool: Box, guard: Vec) -> Self { Self { tool, _guard: guard, } } + + pub async fn start(&mut self) -> Result<()> { + self.tool.do_work().await.context(error::StartCliSnafu) + } } #[async_trait] @@ -69,7 +51,8 @@ impl App for Instance { } async fn start(&mut self) -> Result<()> { - self.tool.do_work().await + self.start().await.unwrap(); + Ok(()) } fn wait_signal(&self) -> bool { @@ -96,7 +79,12 @@ impl Command { None, ); - self.cmd.build(guard).await + let tool = self.cmd.build().await.context(error::BuildCliSnafu)?; + let instance = Instance { + tool, + _guard: guard, + }; + Ok(instance) } pub fn load_options(&self, global_options: &GlobalOptions) -> Result { @@ -112,38 +100,81 @@ impl Command { } } -#[derive(Parser)] -enum SubCommand { - // Attach(AttachCommand), - Bench(BenchTableMetadataCommand), - Export(ExportCommand), - Import(ImportCommand), -} +#[cfg(test)] +mod tests { + use clap::Parser; + use client::{Client, Database}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_telemetry::logging::LoggingOptions; + + use crate::error::Result as CmdResult; + use crate::options::GlobalOptions; + use crate::{cli, standalone, App}; + + #[tokio::test(flavor = "multi_thread")] + async fn test_export_create_table_with_quoted_names() -> CmdResult<()> { + let output_dir = tempfile::tempdir().unwrap(); + + let standalone = standalone::Command::parse_from([ + "standalone", + "start", + "--data-home", + &*output_dir.path().to_string_lossy(), + ]); + + let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap(); + let mut instance = standalone.build(standalone_opts).await?; + instance.start().await?; + + let client = Client::with_urls(["127.0.0.1:4001"]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + database + .sql(r#"CREATE DATABASE "cli.export.create_table";"#) + .await + .unwrap(); + database + .sql( + r#"CREATE TABLE "cli.export.create_table"."a.b.c"( + ts TIMESTAMP, + TIME INDEX (ts) + ) engine=mito; + "#, + ) + .await + .unwrap(); + + let output_dir = tempfile::tempdir().unwrap(); + let cli = cli::Command::parse_from([ + "cli", + "export", + "--addr", + "127.0.0.1:4000", + "--output-dir", + &*output_dir.path().to_string_lossy(), + "--target", + "schema", + ]); + let mut cli_app = cli.build(LoggingOptions::default()).await?; + cli_app.start().await?; + + instance.stop().await?; + + let output_file = output_dir + .path() + .join("greptime") + .join("cli.export.create_table") + .join("create_tables.sql"); + let res = std::fs::read_to_string(output_file).unwrap(); + let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) + +ENGINE=mito +; +"#; + assert_eq!(res.trim(), expect.trim()); -impl SubCommand { - async fn build(&self, guard: Vec) -> Result { - match self { - // SubCommand::Attach(cmd) => cmd.build().await, - SubCommand::Bench(cmd) => cmd.build(guard).await, - SubCommand::Export(cmd) => cmd.build(guard).await, - SubCommand::Import(cmd) => cmd.build(guard).await, - } - } -} - -#[derive(Debug, Parser)] -pub(crate) struct AttachCommand { - #[clap(long)] - pub(crate) grpc_addr: String, - #[clap(long)] - pub(crate) meta_addr: Option, - #[clap(long, action)] - pub(crate) disable_helper: bool, -} - -impl AttachCommand { - #[allow(dead_code)] - async fn build(self) -> Result { - unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373") + Ok(()) } } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index f042b48478d4..80e788208ff0 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -114,6 +114,20 @@ pub enum Error { source: frontend::error::Error, }, + #[snafu(display("Failed to build cli"))] + BuildCli { + #[snafu(implicit)] + location: Location, + source: cli::error::Error, + }, + + #[snafu(display("Failed to start cli"))] + StartCli { + #[snafu(implicit)] + location: Location, + source: cli::error::Error, + }, + #[snafu(display("Failed to build meta server"))] BuildMetaServer { #[snafu(implicit)] @@ -346,6 +360,8 @@ impl ErrorExt for Error { Error::ShutdownMetaServer { source, .. } => source.status_code(), Error::BuildMetaServer { source, .. } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(), + Error::BuildCli { source, .. } => source.status_code(), + Error::StartCli { source, .. } => source.status_code(), Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { source.status_code() diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index a2b6b41c019a..a9ad12bfbc02 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; +use catalog::information_extension::DistributedInformationExtension; use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; use client::client_manager::NodeClients; @@ -41,7 +42,7 @@ use crate::error::{ MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; -use crate::{log_versions, App, DistributedInformationExtension}; +use crate::{log_versions, App}; pub const APP_NAME: &str = "greptime-flownode"; diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index d90a286fc451..36bd37a51980 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -17,6 +17,7 @@ use std::time::Duration; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; +use catalog::information_extension::DistributedInformationExtension; use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; use client::client_manager::NodeClients; @@ -46,7 +47,7 @@ use crate::error::{ Result, StartFrontendSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; -use crate::{log_versions, App, DistributedInformationExtension}; +use crate::{log_versions, App}; type FrontendOptions = GreptimeOptions; diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 3a719b9589a7..acd27f46d731 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -15,17 +15,7 @@ #![feature(assert_matches, let_chains)] use async_trait::async_trait; -use catalog::information_schema::InformationExtension; -use client::api::v1::meta::ProcedureStatus; -use common_error::ext::BoxedError; -use common_meta::cluster::{ClusterInfo, NodeInfo}; -use common_meta::datanode::RegionStat; -use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; -use common_meta::rpc::procedure; -use common_procedure::{ProcedureInfo, ProcedureState}; use common_telemetry::{error, info}; -use meta_client::MetaClientRef; -use snafu::ResultExt; use crate::error::Result; @@ -130,69 +120,3 @@ fn log_env_flags() { info!("argument: {}", argument); } } - -pub struct DistributedInformationExtension { - meta_client: MetaClientRef, -} - -impl DistributedInformationExtension { - pub fn new(meta_client: MetaClientRef) -> Self { - Self { meta_client } - } -} - -#[async_trait::async_trait] -impl InformationExtension for DistributedInformationExtension { - type Error = catalog::error::Error; - - async fn nodes(&self) -> std::result::Result, Self::Error> { - self.meta_client - .list_nodes(None) - .await - .map_err(BoxedError::new) - .context(catalog::error::ListNodesSnafu) - } - - async fn procedures(&self) -> std::result::Result, Self::Error> { - let procedures = self - .meta_client - .list_procedures(&ExecutorContext::default()) - .await - .map_err(BoxedError::new) - .context(catalog::error::ListProceduresSnafu)? - .procedures; - let mut result = Vec::with_capacity(procedures.len()); - for procedure in procedures { - let pid = match procedure.id { - Some(pid) => pid, - None => return catalog::error::ProcedureIdNotFoundSnafu {}.fail(), - }; - let pid = procedure::pb_pid_to_pid(&pid) - .map_err(BoxedError::new) - .context(catalog::error::ConvertProtoDataSnafu)?; - let status = ProcedureStatus::try_from(procedure.status) - .map(|v| v.as_str_name()) - .unwrap_or("Unknown") - .to_string(); - let procedure_info = ProcedureInfo { - id: pid, - type_name: procedure.type_name, - start_time_ms: procedure.start_time_ms, - end_time_ms: procedure.end_time_ms, - state: ProcedureState::Running, - lock_keys: procedure.lock_keys, - }; - result.push((status, procedure_info)); - } - - Ok(result) - } - - async fn region_stats(&self) -> std::result::Result, Self::Error> { - self.meta_client - .list_region_stats() - .await - .map_err(BoxedError::new) - .context(catalog::error::ListRegionStatsSnafu) - } -} diff --git a/src/plugins/Cargo.toml b/src/plugins/Cargo.toml index 977b7d9b7c7e..7582295ecde5 100644 --- a/src/plugins/Cargo.toml +++ b/src/plugins/Cargo.toml @@ -9,6 +9,8 @@ workspace = true [dependencies] auth.workspace = true +clap.workspace = true +cli.workspace = true common-base.workspace = true datanode.workspace = true frontend.workspace = true diff --git a/src/plugins/src/cli.rs b/src/plugins/src/cli.rs new file mode 100644 index 000000000000..74543a162355 --- /dev/null +++ b/src/plugins/src/cli.rs @@ -0,0 +1,36 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use clap::Parser; +use cli::error::Result; +use cli::{BenchTableMetadataCommand, ExportCommand, ImportCommand, Tool}; + +#[derive(Parser)] +pub enum SubCommand { + // Attach(AttachCommand), + Bench(BenchTableMetadataCommand), + Export(ExportCommand), + Import(ImportCommand), +} + +impl SubCommand { + pub async fn build(&self) -> Result> { + match self { + // SubCommand::Attach(cmd) => cmd.build().await, + SubCommand::Bench(cmd) => cmd.build().await, + SubCommand::Export(cmd) => cmd.build().await, + SubCommand::Import(cmd) => cmd.build().await, + } + } +} diff --git a/src/plugins/src/lib.rs b/src/plugins/src/lib.rs index a29ed0e4a8e7..fdb7abc01710 100644 --- a/src/plugins/src/lib.rs +++ b/src/plugins/src/lib.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod cli; mod datanode; mod frontend; mod meta_srv; mod options; +pub use cli::SubCommand; pub use datanode::{setup_datanode_plugins, start_datanode_plugins}; pub use frontend::{setup_frontend_plugins, start_frontend_plugins}; pub use meta_srv::{setup_metasrv_plugins, start_metasrv_plugins}; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 8bdb8299f7c4..83778da5bbc4 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -23,10 +23,10 @@ use cache::{ build_datanode_cache_registry, build_fundamental_cache_registry, with_default_composite_cache_registry, }; +use catalog::information_extension::DistributedInformationExtension; use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use client::client_manager::NodeClients; use client::Client; -use cmd::DistributedInformationExtension; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};