Skip to content

Commit

Permalink
chore: after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 30, 2024
1 parent 2b888b1 commit cd3df5d
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 154 deletions.
27 changes: 3 additions & 24 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, watch, Mutex, RwLock};

pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::KvBackendTableSource;
use crate::adapter::refill::RefillTask;
use crate::adapter::util::{
relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc,
};
use crate::adapter::table_source::KvBackendTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
Expand Down Expand Up @@ -738,7 +736,7 @@ impl CreateFlowArgs {
/// Create&Remove flow
impl FlowWorkerManager {
/// Get table info source
pub fn table_info_source(&self) -> &TableSource {
pub fn table_info_source(&self) -> &KvBackendTableSource {
&self.table_info_source
}

Expand Down Expand Up @@ -814,27 +812,8 @@ impl FlowWorkerManager {
.fail()?,
}
}

let table_id = self
.table_info_source
.get_table_id_from_name(sink_table_name)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table id for table name {:?}", sink_table_name),
})?;
let table_info_value = self
.table_info_source
.get_table_info_value(&table_id)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table info value for table id {:?}", table_id),
})?;
let real_schema = table_info_value_to_relation_desc(table_info_value)?;
node_ctx.assign_table_schema(sink_table_name, real_schema.clone())?;
} else {
// assign inferred schema to sink table
// create sink table
node_ctx.assign_table_schema(sink_table_name, flow_plan.schema.clone())?;
let did_create = self
.create_table_from_relation(
&format!("flow-id={flow_id}"),
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl FlownodeContext {
source_to_tasks: Default::default(),
flow_to_sink: Default::default(),
sink_to_flow: Default::default(),
flow_plans: Default::default(),
source_sender: Default::default(),
sink_receiver: Default::default(),
table_source,
Expand Down
4 changes: 2 additions & 2 deletions src/flow/src/adapter/refill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;

use super::{FlowId, FlowWorkerManager};
use crate::adapter::table_source::TableSource;
use crate::adapter::table_source::KvBackendTableSource;
use crate::adapter::FlowWorkerManagerRef;
use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu};
use crate::expr::error::ExternalSnafu;
Expand Down Expand Up @@ -331,7 +331,7 @@ impl RefillTask {
table_id: TableId,
time_range: Option<(common_time::Timestamp, common_time::Timestamp)>,
time_col_name: &str,
table_src: &TableSource,
table_src: &KvBackendTableSource,
) -> Result<RefillTask, Error> {
let (table_name, table_schema) = table_src.get_table_name_schema(&table_id).await?;
let all_col_names: BTreeSet<_> = table_schema
Expand Down
28 changes: 8 additions & 20 deletions src/flow/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use catalog::RegisterTableRequest;
Expand All @@ -37,14 +36,13 @@ use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::test_util::MemTable;

use crate::adapter::node_context::IdToNameMap;
use crate::adapter::table_source::FlowDummyTableSource;
use crate::adapter::FlownodeContext;
use crate::df_optimizer::apply_df_optimizer;
use crate::expr::GlobalId;
use crate::repr::{ColumnType, RelationType};
use crate::transform::register_function_to_query_engine;

pub fn create_test_ctx() -> FlownodeContext {
let mut schemas = HashMap::new();
let mut tri_map = IdToNameMap::new();
{
let gid = GlobalId::User(0);
Expand All @@ -53,10 +51,7 @@ pub fn create_test_ctx() -> FlownodeContext {
"public".to_string(),
"numbers".to_string(),
];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);

tri_map.insert(Some(name.clone()), Some(1024), gid);
schemas.insert(gid, schema.into_named(vec![Some("number".to_string())]));
}

{
Expand All @@ -66,23 +61,16 @@ pub fn create_test_ctx() -> FlownodeContext {
"public".to_string(),
"numbers_with_ts".to_string(),
];
let schema = RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
]);
schemas.insert(
gid,
schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
);
tri_map.insert(Some(name.clone()), Some(1025), gid);
}

FlownodeContext {
schema: schemas,
table_repr: tri_map,
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),
..Default::default()
}
let dummy_source = FlowDummyTableSource::default();

let mut ctx = FlownodeContext::new(Box::new(dummy_source));
ctx.table_repr = tri_map;
ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public")));

ctx
}

pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
Expand Down
108 changes: 0 additions & 108 deletions src/flow/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,117 +156,9 @@ mod test {
use query::parser::QueryLanguageParser;
use session::context::QueryContext;

use super::*;
use crate::adapter::node_context::IdToNameMap;
use crate::adapter::table_source::FlowDummyTableSource;
use crate::df_optimizer::apply_df_optimizer;
use crate::expr::GlobalId;
use crate::df_optimizer::apply_df_optimizer;
use crate::test_utils::create_test_query_engine;

pub fn create_test_ctx() -> FlownodeContext {
let mut tri_map = IdToNameMap::new();
{
let gid = GlobalId::User(0);
let name = [
"greptime".to_string(),
"public".to_string(),
"numbers".to_string(),
];
tri_map.insert(Some(name.clone()), Some(1024), gid);
}

{
let gid = GlobalId::User(1);
let name = [
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
];
tri_map.insert(Some(name.clone()), Some(1025), gid);
}

let dummy_source = FlowDummyTableSource::default();

let mut ctx = FlownodeContext::new(Box::new(dummy_source));
ctx.table_repr = tri_map;
ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public")));

ctx
}

pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
let req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: NUMBERS_TABLE_NAME.to_string(),
table_id: NUMBERS_TABLE_ID,
table: NumbersTable::table(NUMBERS_TABLE_ID),
};
catalog_list.register_table_sync(req).unwrap();

let schema = vec![
datatypes::schema::ColumnSchema::new("number", CDT::uint32_datatype(), false),
datatypes::schema::ColumnSchema::new(
"ts",
CDT::timestamp_millisecond_datatype(),
false,
),
];
let mut columns = vec![];
let numbers = (1..=10).collect_vec();
let column: VectorRef = Arc::new(<u32 as Scalar>::VectorType::from_vec(numbers));
columns.push(column);

let ts = (1..=10).collect_vec();
let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10);
ts.into_iter()
.map(|v| builder.push(Some(TimestampMillisecond::new(v))))
.count();
let column: VectorRef = builder.to_vector_cloned();
columns.push(column);

let schema = Arc::new(Schema::new(schema));
let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap();
let table = MemTable::table("numbers_with_ts", recordbatch);

let req_with_ts = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "numbers_with_ts".to_string(),
table_id: 1024,
table,
};
catalog_list.register_table_sync(req_with_ts).unwrap();

let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);

let engine = factory.query_engine();
register_function_to_query_engine(&engine);

assert_eq!("datafusion", engine.name());
engine
}

pub async fn sql_to_substrait(engine: Arc<dyn QueryEngine>, sql: &str) -> proto::Plan {
// let engine = create_test_query_engine();
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
let plan = engine
.planner()
.plan(&stmt, QueryContext::arc())
.await
.unwrap();
let plan = apply_df_optimizer(plan).await.unwrap();

// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
let bytes = DFLogicalSubstraitConvertor {}
.encode(&plan, DefaultSerializer)
.unwrap();

proto::Plan::decode(bytes).unwrap()
}

/// TODO(discord9): add more illegal sql tests
#[tokio::test]
async fn test_missing_key_check() {
Expand Down

0 comments on commit cd3df5d

Please sign in to comment.