Skip to content

Commit

Permalink
refactor(table): remove unused table requests (#3603)
Browse files Browse the repository at this point in the history
* refactor(table): remove unused requests

Signed-off-by: Zhenchi <[email protected]>

* update comments

Signed-off-by: Zhenchi <[email protected]>

* fix: clippy

Signed-off-by: Zhenchi <[email protected]>

* fix: compile

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Mar 28, 2024
1 parent 06a9052 commit 63681f0
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 241 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ testing = []
workspace = true

[dependencies]
api.workspace = true
arrow.workspace = true
arrow-schema.workspace = true
async-stream.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use api::v1::CreateTableExpr;
use futures::future::BoxFuture;
use futures_util::stream::BoxStream;
use table::metadata::TableId;
use table::requests::CreateTableRequest;
use table::TableRef;

use crate::error::Result;
Expand Down Expand Up @@ -75,9 +75,9 @@ pub type OpenSystemTableHook =
/// Register system table request:
/// - When system table is already created and registered, the hook will be called
/// with table ref after opening the system table
/// - When system table is not exists, create and register the table by create_table_request and calls open_hook with the created table.
/// - When system table is not exists, create and register the table by `create_table_expr` and calls `open_hook` with the created table.
pub struct RegisterSystemTableRequest {
pub create_table_request: CreateTableRequest,
pub create_table_expr: CreateTableExpr,
pub open_hook: Option<OpenSystemTableHook>,
}

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ async fn open_all_regions(
));
}
}
info!("going to open {} regions", regions.len());
info!("going to open {} region(s)", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];

Expand Down
7 changes: 1 addition & 6 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ pub enum Error {
#[snafu(display("Invalid DeleteRequest, reason: {}", reason))]
InvalidDeleteRequest { reason: String, location: Location },

#[snafu(display("Invalid system table definition: {err_msg}"))]
InvalidSystemTableDef { err_msg: String, location: Location },

#[snafu(display("Table not found: {}", table_name))]
TableNotFound { table_name: String },

Expand Down Expand Up @@ -322,9 +319,7 @@ impl ErrorExt for Error {
| Error::VectorToGrpcColumn { .. }
| Error::InvalidRegionRequest { .. } => StatusCode::Internal,

Error::ContextValueNotFound { .. } | Error::InvalidSystemTableDef { .. } => {
StatusCode::Unexpected
}
Error::ContextValueNotFound { .. } => StatusCode::Unexpected,

Error::TableNotFound { .. } => StatusCode::TableNotFound,

Expand Down
63 changes: 6 additions & 57 deletions src/frontend/src/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,19 @@ mod dummy {
mod python {
use api::v1::ddl_request::Expr;
use api::v1::greptime_request::Request;
use api::v1::{CreateTableExpr, DdlRequest};
use api::v1::DdlRequest;
use arc_swap::ArcSwap;
use catalog::RegisterSystemTableRequest;
use common_error::ext::BoxedError;
use common_meta::table_name::TableName;
use common_telemetry::{error, info};
use operator::expr_factory;
use script::manager::ScriptManager;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::requests::CreateTableRequest;

use super::*;
use crate::error::{CatalogSnafu, InvalidSystemTableDefSnafu, TableNotFoundSnafu};
use crate::error::{CatalogSnafu, TableNotFoundSnafu};
use crate::instance::Instance;

/// A placeholder for the real gRPC handler.
Expand Down Expand Up @@ -148,17 +146,13 @@ mod python {
}

let RegisterSystemTableRequest {
create_table_request: request,
create_table_expr: expr,
open_hook,
} = self.script_manager.create_table_request(catalog);

if let Some(table) = self
.catalog_manager
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
{
Expand All @@ -171,13 +165,8 @@ mod python {
return Ok(());
}

let table_name = TableName::new(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);

let expr = Self::create_table_expr(request)?;
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);

let _ = self
.grpc_handler
Expand Down Expand Up @@ -217,46 +206,6 @@ mod python {
Ok(())
}

fn create_table_expr(request: CreateTableRequest) -> Result<CreateTableExpr> {
let column_schemas = request.schema.column_schemas;

let time_index = column_schemas
.iter()
.find_map(|x| {
if x.is_time_index() {
Some(x.name.clone())
} else {
None
}
})
.context(InvalidSystemTableDefSnafu {
err_msg: "Time index is not defined.",
})?;

let primary_keys = request
.primary_key_indices
.iter()
// Indexing has to be safe because the create script table request is pre-defined.
.map(|i| column_schemas[*i].name.clone())
.collect::<Vec<_>>();

let column_defs = expr_factory::column_schemas_to_defs(column_schemas, &primary_keys)?;

Ok(CreateTableExpr {
catalog_name: request.catalog_name,
schema_name: request.schema_name,
table_name: request.table_name,
desc: request.desc.unwrap_or_default(),
column_defs,
time_index,
primary_keys,
create_if_not_exists: request.create_if_not_exists,
table_options: (&request.table_options).into(),
table_id: None, // Should and will be assigned by Meta.
engine: request.engine,
})
}

pub async fn insert_script(
&self,
query_ctx: QueryContextRef,
Expand Down
27 changes: 13 additions & 14 deletions src/script/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,25 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use api::v1::CreateTableExpr;
use arc_swap::ArcSwap;
use catalog::{OpenSystemTableHook, RegisterSystemTableRequest};
use common_catalog::consts::{default_engine, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID};
use common_catalog::consts::{default_engine, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_query::Output;
use common_telemetry::logging;
use futures::future::FutureExt;
use query::QueryEngineRef;
use servers::query_handler::grpc::GrpcQueryHandlerRef;
use snafu::{OptionExt, ResultExt};
use table::requests::{CreateTableRequest, TableOptions};
use table::TableRef;

use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine};
use crate::error::{
CompilePythonSnafu, ExecutePythonSnafu, Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu,
};
use crate::python::{PyEngine, PyScript};
use crate::table::{
build_scripts_schema, get_primary_key_indices, ScriptsTable, ScriptsTableRef,
SCRIPTS_TABLE_NAME,
};
use crate::table::{build_scripts_schema, ScriptsTable, ScriptsTableRef, SCRIPTS_TABLE_NAME};

pub struct ScriptManager<E: ErrorExt + Send + Sync + 'static> {
compiled: RwLock<HashMap<String, Arc<PyScript>>>,
Expand Down Expand Up @@ -69,19 +66,21 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptManager<E> {
}

pub fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
let request = CreateTableRequest {
id: SCRIPTS_TABLE_ID,
let (time_index, primary_keys, column_defs) = build_scripts_schema();

let create_table_expr = CreateTableExpr {
catalog_name: catalog.to_string(),
// TODO(dennis): put the scripts table into `system` schema?
// We always put the scripts table into `public` schema right now.
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: SCRIPTS_TABLE_NAME.to_string(),
desc: Some("GreptimeDB scripts table for Python".to_string()),
schema: build_scripts_schema(),
region_numbers: vec![0],
primary_key_indices: get_primary_key_indices(),
desc: "GreptimeDB scripts table for Python".to_string(),
column_defs,
time_index,
primary_keys,
create_if_not_exists: true,
table_options: TableOptions::default(),
table_options: Default::default(),
table_id: None, // Should and will be assigned by Meta.
engine: default_engine().to_string(),
};

Expand All @@ -94,7 +93,7 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptManager<E> {
});

RegisterSystemTableRequest {
create_table_request: request,
create_table_expr,
open_hook: Some(hook),
}
}
Expand Down
59 changes: 27 additions & 32 deletions src/script/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
//! Scripts table
use std::sync::Arc;

use api::helper::ColumnDataTypeWrapper;
use api::v1::greptime_request::Request;
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema as PbColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows,
SemanticType,
ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
RowInsertRequests, Rows, SemanticType,
};
use catalog::error::CompileScriptInternalSnafu;
use common_error::ext::{BoxedError, ErrorExt};
Expand All @@ -33,7 +32,6 @@ use datafusion::logical_expr::{and, col, lit};
use datafusion_common::TableReference;
use datafusion_expr::LogicalPlanBuilder;
use datatypes::prelude::ScalarVector;
use datatypes::schema::{ColumnSchema, RawSchema};
use datatypes::vectors::{StringVector, Vector};
use query::plan::LogicalPlan;
use query::QueryEngineRef;
Expand Down Expand Up @@ -344,38 +342,35 @@ fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
.build()
}

/// Returns the scripts schema's primary key indices
pub fn get_primary_key_indices() -> Vec<usize> {
let mut indices = vec![];
for (index, c) in build_insert_column_schemas().into_iter().enumerate() {
if c.semantic_type == (SemanticType::Tag as i32) {
indices.push(index);
}
}
/// Builds scripts schema, returns (time index, primary keys, column defs)
pub fn build_scripts_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
let cols = build_insert_column_schemas();

indices
}
let time_index = cols
.iter()
.find_map(|c| {
(c.semantic_type == (SemanticType::Timestamp as i32)).then(|| c.column_name.clone())
})
.unwrap(); // Safety: the column always exists

/// Build scripts table
pub fn build_scripts_schema() -> RawSchema {
let cols = build_insert_column_schemas()
let primary_keys = cols
.iter()
.filter(|c| (c.semantic_type == (SemanticType::Tag as i32)))
.map(|c| c.column_name.clone())
.collect();

let column_defs = cols
.into_iter()
.map(|c| {
let cs = ColumnSchema::new(
c.column_name,
// Safety: the type always exists
ColumnDataTypeWrapper::try_new(c.datatype, c.datatype_extension)
.unwrap()
.into(),
false,
);
if c.semantic_type == SemanticType::Timestamp as i32 {
cs.with_time_index(true)
} else {
cs
}
.map(|c| ColumnDef {
name: c.column_name,
data_type: c.datatype,
is_nullable: false,
default_constraint: vec![],
semantic_type: c.semantic_type,
comment: "".to_string(),
datatype_extension: None,
})
.collect();

RawSchema::new(cols)
(time_index, primary_keys, column_defs)
}
Loading

0 comments on commit 63681f0

Please sign in to comment.