diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 337ff9ddeebb..d38de1347af2 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -46,11 +46,9 @@ use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast, watch, Mutex, RwLock}; pub(crate) use crate::adapter::node_context::FlownodeContext; -use crate::adapter::table_source::KvBackendTableSource; use crate::adapter::refill::RefillTask; -use crate::adapter::util::{ - relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc, -}; +use crate::adapter::table_source::KvBackendTableSource; +use crate::adapter::util::relation_desc_to_column_schemas_with_fallback; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; @@ -738,7 +736,7 @@ impl CreateFlowArgs { /// Create&Remove flow impl FlowWorkerManager { /// Get table info source - pub fn table_info_source(&self) -> &TableSource { + pub fn table_info_source(&self) -> &KvBackendTableSource { &self.table_info_source } @@ -814,27 +812,8 @@ impl FlowWorkerManager { .fail()?, } } - - let table_id = self - .table_info_source - .get_table_id_from_name(sink_table_name) - .await? - .context(UnexpectedSnafu { - reason: format!("Can't get table id for table name {:?}", sink_table_name), - })?; - let table_info_value = self - .table_info_source - .get_table_info_value(&table_id) - .await? - .context(UnexpectedSnafu { - reason: format!("Can't get table info value for table id {:?}", table_id), - })?; - let real_schema = table_info_value_to_relation_desc(table_info_value)?; - node_ctx.assign_table_schema(sink_table_name, real_schema.clone())?; } else { - // assign inferred schema to sink table // create sink table - node_ctx.assign_table_schema(sink_table_name, flow_plan.schema.clone())?; let did_create = self .create_table_from_relation( &format!("flow-id={flow_id}"), diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 10732b5e6d50..33946cbb283d 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -67,6 +67,7 @@ impl FlownodeContext { source_to_tasks: Default::default(), flow_to_sink: Default::default(), sink_to_flow: Default::default(), + flow_plans: Default::default(), source_sender: Default::default(), sink_receiver: Default::default(), table_source, diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 324bb38bfe6c..015616d94c29 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -29,7 +29,7 @@ use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; use super::{FlowId, FlowWorkerManager}; -use crate::adapter::table_source::TableSource; +use crate::adapter::table_source::KvBackendTableSource; use crate::adapter::FlowWorkerManagerRef; use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu}; use crate::expr::error::ExternalSnafu; @@ -331,7 +331,7 @@ impl RefillTask { table_id: TableId, time_range: Option<(common_time::Timestamp, common_time::Timestamp)>, time_col_name: &str, - table_src: &TableSource, + table_src: &KvBackendTableSource, ) -> Result { let (table_name, table_schema) = table_src.get_table_name_schema(&table_id).await?; let all_col_names: BTreeSet<_> = table_schema diff --git a/src/flow/src/test_utils.rs b/src/flow/src/test_utils.rs index 585fa233016c..d7e6172fc469 100644 --- a/src/flow/src/test_utils.rs +++ b/src/flow/src/test_utils.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use catalog::RegisterTableRequest; @@ -37,14 +36,13 @@ use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::test_util::MemTable; use crate::adapter::node_context::IdToNameMap; +use crate::adapter::table_source::FlowDummyTableSource; use crate::adapter::FlownodeContext; use crate::df_optimizer::apply_df_optimizer; use crate::expr::GlobalId; -use crate::repr::{ColumnType, RelationType}; use crate::transform::register_function_to_query_engine; pub fn create_test_ctx() -> FlownodeContext { - let mut schemas = HashMap::new(); let mut tri_map = IdToNameMap::new(); { let gid = GlobalId::User(0); @@ -53,10 +51,7 @@ pub fn create_test_ctx() -> FlownodeContext { "public".to_string(), "numbers".to_string(), ]; - let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); - tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema.into_named(vec![Some("number".to_string())])); } { @@ -66,23 +61,16 @@ pub fn create_test_ctx() -> FlownodeContext { "public".to_string(), "numbers_with_ts".to_string(), ]; - let schema = RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), false), - ColumnType::new(CDT::timestamp_millisecond_datatype(), false), - ]); - schemas.insert( - gid, - schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]), - ); tri_map.insert(Some(name.clone()), Some(1025), gid); } - FlownodeContext { - schema: schemas, - table_repr: tri_map, - query_context: Some(Arc::new(QueryContext::with("greptime", "public"))), - ..Default::default() - } + let dummy_source = FlowDummyTableSource::default(); + + let mut ctx = FlownodeContext::new(Box::new(dummy_source)); + ctx.table_repr = tri_map; + ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public"))); + + ctx } pub fn create_test_query_engine() -> Arc { diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 99da76bd0315..e2a9c2ff8059 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -156,117 +156,9 @@ mod test { use query::parser::QueryLanguageParser; use session::context::QueryContext; - use super::*; - use crate::adapter::node_context::IdToNameMap; - use crate::adapter::table_source::FlowDummyTableSource; - use crate::df_optimizer::apply_df_optimizer; - use crate::expr::GlobalId; use crate::df_optimizer::apply_df_optimizer; use crate::test_utils::create_test_query_engine; - pub fn create_test_ctx() -> FlownodeContext { - let mut tri_map = IdToNameMap::new(); - { - let gid = GlobalId::User(0); - let name = [ - "greptime".to_string(), - "public".to_string(), - "numbers".to_string(), - ]; - tri_map.insert(Some(name.clone()), Some(1024), gid); - } - - { - let gid = GlobalId::User(1); - let name = [ - "greptime".to_string(), - "public".to_string(), - "numbers_with_ts".to_string(), - ]; - tri_map.insert(Some(name.clone()), Some(1025), gid); - } - - let dummy_source = FlowDummyTableSource::default(); - - let mut ctx = FlownodeContext::new(Box::new(dummy_source)); - ctx.table_repr = tri_map; - ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public"))); - - ctx - } - - pub fn create_test_query_engine() -> Arc { - let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); - let req = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: NUMBERS_TABLE_NAME.to_string(), - table_id: NUMBERS_TABLE_ID, - table: NumbersTable::table(NUMBERS_TABLE_ID), - }; - catalog_list.register_table_sync(req).unwrap(); - - let schema = vec![ - datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false), - datatypes::schema::ColumnSchema::new( - "ts", - CDT::timestamp_millisecond_datatype(), - false, - ), - ]; - let mut columns = vec![]; - let numbers = (1..=10).collect_vec(); - let column: VectorRef = Arc::new(::VectorType::from_vec(numbers)); - columns.push(column); - - let ts = (1..=10).collect_vec(); - let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10); - ts.into_iter() - .map(|v| builder.push(Some(TimestampMillisecond::new(v)))) - .count(); - let column: VectorRef = builder.to_vector_cloned(); - columns.push(column); - - let schema = Arc::new(Schema::new(schema)); - let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap(); - let table = MemTable::table("numbers_with_ts", recordbatch); - - let req_with_ts = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "numbers_with_ts".to_string(), - table_id: 1024, - table, - }; - catalog_list.register_table_sync(req_with_ts).unwrap(); - - let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false); - - let engine = factory.query_engine(); - register_function_to_query_engine(&engine); - - assert_eq!("datafusion", engine.name()); - engine - } - - pub async fn sql_to_substrait(engine: Arc, sql: &str) -> proto::Plan { - // let engine = create_test_query_engine(); - let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); - let plan = engine - .planner() - .plan(&stmt, QueryContext::arc()) - .await - .unwrap(); - let plan = apply_df_optimizer(plan).await.unwrap(); - - // encode then decode so to rely on the impl of conversion from logical plan to substrait plan - let bytes = DFLogicalSubstraitConvertor {} - .encode(&plan, DefaultSerializer) - .unwrap(); - - proto::Plan::decode(bytes).unwrap() - } - /// TODO(discord9): add more illegal sql tests #[tokio::test] async fn test_missing_key_check() {