Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support set database of the current session #19786

Merged
merged 7 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl Binder {
) -> Binder {
Binder {
catalog: session.env().catalog_reader().read_guard(),
db_name: session.database().to_owned(),
db_name: session.database(),
session_id: session.id(),
context: BindContext::new(),
auth_context: session.auth_context(),
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_owner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ pub async fn handle_alter_owner(
stmt_type: StatementType,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_obj_name) =
Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let new_owner_name = Binder::resolve_user_name(vec![new_owner_name].into())?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ pub async fn handle_alter_parallelism(
deferred: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let table_id = {
Expand Down
30 changes: 15 additions & 15 deletions src/frontend/src/handler/alter_rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ pub async fn handle_rename_table(
new_table_name: ObjectName,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let new_table_name = Binder::resolve_table_name(new_table_name)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down Expand Up @@ -79,12 +79,12 @@ pub async fn handle_rename_index(
new_index_name: ObjectName,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_index_name) =
Binder::resolve_schema_qualified_name(db_name, index_name.clone())?;
let new_index_name = Binder::resolve_index_name(new_index_name)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down Expand Up @@ -113,12 +113,12 @@ pub async fn handle_rename_view(
new_view_name: ObjectName,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_view_name) =
Binder::resolve_schema_qualified_name(db_name, view_name.clone())?;
let new_view_name = Binder::resolve_view_name(new_view_name)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand All @@ -143,12 +143,12 @@ pub async fn handle_rename_sink(
new_sink_name: ObjectName,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_sink_name) =
Binder::resolve_schema_qualified_name(db_name, sink_name.clone())?;
let new_sink_name = Binder::resolve_sink_name(new_sink_name)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down Expand Up @@ -176,12 +176,12 @@ pub async fn handle_rename_subscription(
new_subscription_name: ObjectName,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_subscription_name) =
Binder::resolve_schema_qualified_name(db_name, subscription_name.clone())?;
let new_subscription_name = Binder::resolve_subscription_name(new_subscription_name)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down Expand Up @@ -210,12 +210,12 @@ pub async fn handle_rename_source(
new_source_name: ObjectName,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_source_name) =
Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
let new_source_name = Binder::resolve_source_name(new_source_name)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down Expand Up @@ -253,7 +253,7 @@ pub async fn handle_rename_schema(
new_schema_name: ObjectName,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let schema_name = Binder::resolve_schema_name(schema_name)?;
let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;

Expand All @@ -275,7 +275,7 @@ pub async fn handle_rename_schema(
session.check_privilege_for_drop_alter_db_schema(schema)?;

// To rename a schema you must also have the CREATE privilege for the database.
if let Some(user) = user_reader.get_user_by_name(session.user_name()) {
if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
if !user.is_super
&& !user
.check_privilege(&grant_privilege::Object::DatabaseId(db_id), AclMode::Create)
Expand Down Expand Up @@ -321,7 +321,7 @@ pub async fn handle_rename_database(
session.check_privilege_for_drop_alter_db_schema(database)?;

// Non-superuser owners must also have the CREATEDB privilege.
if let Some(user) = user_reader.get_user_by_name(session.user_name()) {
if let Some(user) = user_reader.get_user_by_name(&session.user_name()) {
if !user.is_super && !user.can_create_db {
return Err(ErrorCode::PermissionDenied(
"Non-superuser owners must also have the CREATEDB privilege".to_owned(),
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_set_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ pub async fn handle_alter_set_schema(
func_args: Option<Vec<OperateFunctionArg>>,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_obj_name) =
Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let new_schema_name = Binder::resolve_schema_name(new_schema_name)?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ pub async fn handle_alter_source_column(
) -> Result<RwPgResponse> {
// Get original definition
let session = handler_args.session.clone();
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_source_name) =
Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ pub fn fetch_source_catalog_with_db_schema_id(
session: &SessionImpl,
name: &ObjectName,
) -> Result<(Arc<SourceCatalog>, DatabaseId, SchemaId)> {
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_source_name) =
Binder::resolve_schema_qualified_name(db_name, name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ pub async fn handle_alter_streaming_rate_limit(
rate_limit: i32,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_swap_rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ pub async fn handle_swap_rename(
stmt_type: StatementType,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (src_schema_name, src_obj_name) =
Binder::resolve_schema_qualified_name(db_name, source_object)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();
let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
let (target_schema_name, target_obj_name) =
Binder::resolve_schema_qualified_name(db_name, target_object)?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,11 @@ pub fn fetch_table_catalog_for_alter(
session: &SessionImpl,
table_name: &ObjectName,
) -> Result<Arc<TableCatalog>> {
let db_name = session.database();
let db_name = &session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub async fn handle_alter_user(
.to_prost();

let session_user = user_reader
.get_user_by_name(session.user_name())
.get_user_by_name(&session.user_name())
.ok_or_else(|| CatalogError::NotFound("user", session.user_name().to_owned()))?;

match stmt.mode {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/close_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn handle_close_cursor(
) -> Result<RwPgResponse> {
let session = handle_args.session.clone();
let cursor_manager = session.get_cursor_manager();
let db_name = session.database();
let db_name = &session.database();
if let Some(cursor_name) = stmt.cursor_name {
let (_, cursor_name) = Binder::resolve_schema_qualified_name(db_name, cursor_name.clone())?;
cursor_manager.remove_cursor(cursor_name).await?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub async fn handle_comment(
};

let (schema, table) = Binder::resolve_schema_qualified_name(
session.database(),
&session.database(),
ObjectName(tab.to_vec()),
)?;

Expand All @@ -64,7 +64,7 @@ pub async fn handle_comment(
}
CommentObject::Table => {
let (schema, table) =
Binder::resolve_schema_qualified_name(session.database(), object_name)?;
Binder::resolve_schema_qualified_name(&session.database(), object_name)?;
let (database_id, schema_id) =
session.get_database_and_schema_id_for_create(schema.clone())?;
let table = binder.bind_table(schema.as_deref(), &table, None)?;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub async fn handle_create_aggregate(

// resolve database and schema id
let session = &handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub async fn handle_create_connection(
stmt: CreateConnectionStatement,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();
let db_name = session.database();
let db_name = &session.database();
let (schema_name, connection_name) =
Binder::resolve_schema_qualified_name(db_name, stmt.connection_name.clone())?;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn handle_create_database(
{
let user_reader = session.env().user_info_reader();
let reader = user_reader.read_guard();
if let Some(info) = reader.get_user_by_name(session.user_name()) {
if let Some(info) = reader.get_user_by_name(&session.user_name()) {
if !info.can_create_db && !info.is_super {
return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub async fn handle_create_function(

// resolve database and schema id
let session = &handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ pub(crate) fn resolve_index_schema(
index_name: ObjectName,
table_name: ObjectName,
) -> Result<(String, Arc<TableCatalog>, String)> {
let db_name = session.database();
let db_name = &session.database();
let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let user_name = &session.user_name();
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let index_table_name = Binder::resolve_index_name(index_name)?;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn gen_create_mv_plan_bound(
context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
}

let db_name = session.database();
let db_name = &session.database();
let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;

let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn handle_create_schema(
owner: Option<ObjectName>,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let database_name = session.database();
let database_name = &session.database();
let schema_name = Binder::resolve_schema_name(schema_name)?;

if schema_name.starts_with(RESERVED_PG_SCHEMA_PREFIX) {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn handle_create_secret(
.map_err(|e| anyhow::anyhow!(e))?;

let session = handler_args.session.clone();
let db_name = session.database();
let db_name = &session.database();
let (schema_name, secret_name) =
Binder::resolve_schema_qualified_name(db_name, stmt.secret_name.clone())?;

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn gen_sink_plan(
let session = handler_args.session.clone();
let session = session.as_ref();
let user_specified_columns = !stmt.columns.is_empty();
let db_name = session.database();
let db_name = &session.database();
let (sink_schema_name, sink_table_name) =
Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;

Expand Down Expand Up @@ -487,7 +487,7 @@ pub fn fetch_incoming_sinks(
) -> Result<Vec<Arc<SinkCatalog>>> {
let reader = session.env().catalog_reader().read_guard();
let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
let db_name = session.database();
let db_name = &session.database();
for schema in reader.iter_schemas(db_name)? {
for sink in schema.iter_sink() {
if incoming_sink_ids.contains(&sink.id.sink_id) {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ pub async fn bind_create_source_or_table_with_connector(
source_rate_limit: Option<u32>,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
let db_name: &str = session.database();
let db_name: &str = &session.database();
let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, full_name)?;
let (database_id, schema_id) =
session.get_database_and_schema_id_for_create(schema_name.clone())?;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sql_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ pub async fn handle_create_sql_function(

// resolve database and schema id
let session = &handler_args.session;
let db_name = session.database();
let db_name = &session.database();
let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn create_subscription_catalog(
context: OptimizerContextRef,
stmt: CreateSubscriptionStatement,
) -> Result<SubscriptionCatalog> {
let db_name = session.database();
let db_name = &session.database();
let (subscription_schema_name, subscription_name) =
Binder::resolve_schema_qualified_name(db_name, stmt.subscription_name.clone())?;
let (table_schema_name, subscription_from_table_name) =
Expand Down
Loading
Loading