diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 556761c4781b..0bb9bb36f43b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -46,9 +46,8 @@ use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast, watch, Mutex, RwLock}; pub(crate) use crate::adapter::node_context::FlownodeContext; -use crate::adapter::table_source::ManagedTableSource; use crate::adapter::refill::RefillTask; -use crate::adapter::table_source::KvBackendTableSource; +use crate::adapter::table_source::ManagedTableSource; use crate::adapter::util::relation_desc_to_column_schemas_with_fallback; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; @@ -737,7 +736,7 @@ impl CreateFlowArgs { /// Create&Remove flow impl FlowWorkerManager { /// Get table info source - pub fn table_info_source(&self) -> &KvBackendTableSource { + pub fn table_info_source(&self) -> &ManagedTableSource { &self.table_info_source } diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 015616d94c29..e170ad4f232f 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -29,7 +29,7 @@ use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; use super::{FlowId, FlowWorkerManager}; -use crate::adapter::table_source::KvBackendTableSource; +use crate::adapter::table_source::ManagedTableSource; use crate::adapter::FlowWorkerManagerRef; use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu}; use crate::expr::error::ExternalSnafu; @@ -331,7 +331,7 @@ impl RefillTask { table_id: TableId, time_range: Option<(common_time::Timestamp, common_time::Timestamp)>, time_col_name: &str, - table_src: &KvBackendTableSource, + table_src: &ManagedTableSource, ) -> Result { let (table_name, table_schema) = table_src.get_table_name_schema(&table_id).await?; let all_col_names: BTreeSet<_> = table_schema diff --git a/src/flow/src/test_utils.rs b/src/flow/src/test_utils.rs index d7e6172fc469..3cc42efda596 100644 --- a/src/flow/src/test_utils.rs +++ b/src/flow/src/test_utils.rs @@ -36,7 +36,7 @@ use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::test_util::MemTable; use crate::adapter::node_context::IdToNameMap; -use crate::adapter::table_source::FlowDummyTableSource; +use crate::adapter::table_source::test::FlowDummyTableSource; use crate::adapter::FlownodeContext; use crate::df_optimizer::apply_df_optimizer; use crate::expr::GlobalId; diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 46f4d1e6cf82..e2a9c2ff8059 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -156,11 +156,6 @@ mod test { use query::parser::QueryLanguageParser; use session::context::QueryContext; - use super::*; - use crate::adapter::node_context::IdToNameMap; - use crate::adapter::table_source::test::FlowDummyTableSource; - use crate::df_optimizer::apply_df_optimizer; - use crate::expr::GlobalId; use crate::df_optimizer::apply_df_optimizer; use crate::test_utils::create_test_query_engine;