diff --git a/Cargo.lock b/Cargo.lock index fa8ba34d1a3b..5d01629eae6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4487,7 +4487,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a875e976441188028353f7274a46a7e6e065c5d4#a875e976441188028353f7274a46a7e6e065c5d4" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2e80832079e9dd3c493029cb4db062b36b4b7bdd#2e80832079e9dd3c493029cb4db062b36b4b7bdd" dependencies = [ "prost 0.12.6", "serde", diff --git a/Cargo.toml b/Cargo.toml index 990bc71a907b..51ad8b522262 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,7 +124,7 @@ etcd-client = "0.13" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2e80832079e9dd3c493029cb4db062b36b4b7bdd" } hex = "0.4" humantime = "2.1" humantime-serde = "1.1" diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 177bdf6b716a..de3207a105df 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use api::v1::flow::flow_request::Body as PbFlowRequest; use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader}; -use api::v1::ExpireAfter; +use api::v1::{ExpireAfter, QueryContext}; use async_trait::async_trait; use common_catalog::format_full_flow_name; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; @@ -35,9 +35,8 @@ use snafu::{ensure, ResultExt}; use strum::AsRefStr; use table::metadata::TableId; -use super::utils::add_peer_context_if_needed; use crate::cache_invalidator::Context; -use crate::ddl::utils::handle_retry_error; +use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error}; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::instruction::{CacheIdent, CreateFlow}; @@ -47,7 +46,7 @@ use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId}; use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock}; use crate::peer::Peer; -use crate::rpc::ddl::{CreateFlowTask, QueryContext}; +use crate::rpc::ddl::CreateFlowTask; use crate::{metrics, ClusterId}; /// The procedure of flow creation. @@ -187,7 +186,7 @@ impl CreateFlowProcedure { let request = FlowRequest { header: Some(FlowRequestHeader { tracing_context: TracingContext::from_current_span().to_w3c(), - query_context: Some(self.data.query_context.clone().into()), + query_context: Some(self.data.query_context.clone()), }), body: Some(PbFlowRequest::Create((&self.data).into())), }; diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index a130e0590c47..483d8e5eb1ad 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -64,8 +64,8 @@ async fn test_create_flow_source_table_not_found() { let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false); let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); let ddl_context = new_ddl_context(node_manager); - let query_ctx = QueryContext::arc().into(); - let mut procedure = CreateFlowProcedure::new(cluster_id, task, query_ctx, ddl_context); + let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + let mut procedure = CreateFlowProcedure::new(cluster_id, task, query_ctx.into(), ddl_context); let err = procedure.on_prepare().await.unwrap_err(); assert_matches!(err, error::Error::TableNotFound { .. }); } @@ -83,9 +83,13 @@ pub(crate) async fn create_test_flow( sink_table_name.clone(), false, ); - let query_ctx = QueryContext::arc().into(); - let mut procedure = - CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context.clone()); + let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + let mut procedure = CreateFlowProcedure::new( + cluster_id, + task.clone(), + query_ctx.into(), + ddl_context.clone(), + ); let output = execute_procedure_until_done(&mut procedure).await.unwrap(); let flow_id = output.downcast_ref::().unwrap(); @@ -133,17 +137,22 @@ async fn test_create_flow() { sink_table_name.clone(), true, ); - let query_ctx = QueryContext::arc().into(); - let mut procedure = - CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context.clone()); + let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + let mut procedure = CreateFlowProcedure::new( + cluster_id, + task.clone(), + query_ctx.into(), + ddl_context.clone(), + ); let output = execute_procedure_until_done(&mut procedure).await.unwrap(); let flow_id = output.downcast_ref::().unwrap(); assert_eq!(*flow_id, 1024); // Creates again let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false); - let query_ctx = QueryContext::arc().into(); - let mut procedure = CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context); + let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + let mut procedure = + CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx.into(), ddl_context); let err = procedure.on_prepare().await.unwrap_err(); assert_matches!(err, error::Error::FlowAlreadyExists { .. }); } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index bac640d401a6..707b6915a329 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use api::v1::meta::ProcedureDetailResponse; +use api::v1::QueryContext; use common_procedure::{ watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, }; @@ -54,7 +55,7 @@ use crate::rpc::ddl::DdlTask::{ }; use crate::rpc::ddl::{ AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, - CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext, + CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::procedure; @@ -824,7 +825,7 @@ impl ProcedureExecutor for DdlManager { self, cluster_id, create_flow_task, - request.query_context.into(), + (*request.query_context).clone().into(), ) .await } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index bec12796e791..45ff1d4e1813 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -31,7 +31,7 @@ use api::v1::meta::{ use api::v1::{ AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter, - Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr, + Option as PbOption, TruncateTableExpr, }; use base64::engine::general_purpose; use base64::Engine as _; @@ -1202,47 +1202,6 @@ impl From for PbDropFlowTask { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct QueryContext { - current_catalog: String, - current_schema: String, - timezone: String, - extensions: HashMap, - channel: u8, -} - -impl From for QueryContext { - fn from(query_context: QueryContextRef) -> Self { - QueryContext { - current_catalog: query_context.current_catalog().to_string(), - current_schema: query_context.current_schema().to_string(), - timezone: query_context.timezone().to_string(), - extensions: query_context.extensions(), - channel: query_context.channel() as u8, - } - } -} - -impl From for PbQueryContext { - fn from( - QueryContext { - current_catalog, - current_schema, - timezone, - extensions, - channel, - }: QueryContext, - ) -> Self { - PbQueryContext { - current_catalog, - current_schema, - timezone, - extensions, - channel: channel as u32, - } - } -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index e20bb2c4db33..965a46ac6536 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -71,6 +71,7 @@ impl procedure_service_server::ProcedureService for Metasrv { param: "query_context", })? .into(); + let task: DdlTask = task .context(error::MissingRequiredParameterSnafu { param: "task" })? .try_into() diff --git a/src/operator/src/flow.rs b/src/operator/src/flow.rs index 1c82fcf00af5..b0073876c962 100644 --- a/src/operator/src/flow.rs +++ b/src/operator/src/flow.rs @@ -102,9 +102,7 @@ impl FlowServiceOperator { let flush_req = FlowRequest { header: Some(FlowRequestHeader { tracing_context: TracingContext::from_current_span().to_w3c(), - query_context: Some( - common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(), - ), + query_context: Some((*ctx).clone().into()), }), body: Some(flow_request::Body::Flush(FlushFlow { flow_id: Some(api::v1::FlowId { id }),