Skip to content

Commit

Permalink
refactor: remove duplicate QueryContext
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 19, 2024
1 parent 422d18d commit c951b5c
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2e80832079e9dd3c493029cb4db062b36b4b7bdd" }
hex = "0.4"
humantime = "2.1"
humantime-serde = "1.1"
Expand Down
9 changes: 4 additions & 5 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::BTreeMap;

use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
use api::v1::ExpireAfter;
use api::v1::{ExpireAfter, QueryContext};
use async_trait::async_trait;
use common_catalog::format_full_flow_name;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
Expand All @@ -35,9 +35,8 @@ use snafu::{ensure, ResultExt};
use strum::AsRefStr;
use table::metadata::TableId;

use super::utils::add_peer_context_if_needed;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::{CacheIdent, CreateFlow};
Expand All @@ -47,7 +46,7 @@ use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::{CreateFlowTask, QueryContext};
use crate::rpc::ddl::CreateFlowTask;
use crate::{metrics, ClusterId};

/// The procedure of flow creation.
Expand Down Expand Up @@ -187,7 +186,7 @@ impl CreateFlowProcedure {
let request = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(self.data.query_context.clone().into()),
query_context: Some(self.data.query_context.clone()),
}),
body: Some(PbFlowRequest::Create((&self.data).into())),
};
Expand Down
29 changes: 19 additions & 10 deletions src/common/meta/src/ddl/tests/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ async fn test_create_flow_source_table_not_found() {
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler));
let ddl_context = new_ddl_context(node_manager);
let query_ctx = QueryContext::arc().into();
let mut procedure = CreateFlowProcedure::new(cluster_id, task, query_ctx, ddl_context);
let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let mut procedure = CreateFlowProcedure::new(cluster_id, task, query_ctx.into(), ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::TableNotFound { .. });
}
Expand All @@ -83,9 +83,13 @@ pub(crate) async fn create_test_flow(
sink_table_name.clone(),
false,
);
let query_ctx = QueryContext::arc().into();
let mut procedure =
CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context.clone());
let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let mut procedure = CreateFlowProcedure::new(
cluster_id,
task.clone(),
query_ctx.into(),
ddl_context.clone(),
);
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let flow_id = output.downcast_ref::<FlowId>().unwrap();

Expand Down Expand Up @@ -133,17 +137,22 @@ async fn test_create_flow() {
sink_table_name.clone(),
true,
);
let query_ctx = QueryContext::arc().into();
let mut procedure =
CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context.clone());
let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let mut procedure = CreateFlowProcedure::new(
cluster_id,
task.clone(),
query_ctx.into(),
ddl_context.clone(),
);
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let flow_id = output.downcast_ref::<FlowId>().unwrap();
assert_eq!(*flow_id, 1024);

// Creates again
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
let query_ctx = QueryContext::arc().into();
let mut procedure = CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context);
let query_ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let mut procedure =
CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx.into(), ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::FlowAlreadyExists { .. });
}
5 changes: 3 additions & 2 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use api::v1::meta::ProcedureDetailResponse;
use api::v1::QueryContext;
use common_procedure::{
watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
};
Expand Down Expand Up @@ -54,7 +55,7 @@ use crate::rpc::ddl::DdlTask::{
};
use crate::rpc::ddl::{
AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
Expand Down Expand Up @@ -824,7 +825,7 @@ impl ProcedureExecutor for DdlManager {
self,
cluster_id,
create_flow_task,
request.query_context.into(),
(*request.query_context).clone().into(),
)
.await
}
Expand Down
43 changes: 1 addition & 42 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use api::v1::meta::{
use api::v1::{
AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
Option as PbOption, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
Expand Down Expand Up @@ -1202,47 +1202,6 @@ impl From<DropFlowTask> for PbDropFlowTask {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryContext {
current_catalog: String,
current_schema: String,
timezone: String,
extensions: HashMap<String, String>,
channel: u8,
}

impl From<QueryContextRef> for QueryContext {
fn from(query_context: QueryContextRef) -> Self {
QueryContext {
current_catalog: query_context.current_catalog().to_string(),
current_schema: query_context.current_schema().to_string(),
timezone: query_context.timezone().to_string(),
extensions: query_context.extensions(),
channel: query_context.channel() as u8,
}
}
}

impl From<QueryContext> for PbQueryContext {
fn from(
QueryContext {
current_catalog,
current_schema,
timezone,
extensions,
channel,
}: QueryContext,
) -> Self {
PbQueryContext {
current_catalog,
current_schema,
timezone,
extensions,
channel: channel as u32,
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/service/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl procedure_service_server::ProcedureService for Metasrv {
param: "query_context",
})?
.into();

let task: DdlTask = task
.context(error::MissingRequiredParameterSnafu { param: "task" })?
.try_into()
Expand Down
4 changes: 1 addition & 3 deletions src/operator/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ impl FlowServiceOperator {
let flush_req = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
),
query_context: Some((*ctx).clone().into()),
}),
body: Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(api::v1::FlowId { id }),
Expand Down

0 comments on commit c951b5c

Please sign in to comment.