Skip to content

Commit

Permalink
fix: bind under schema path for function, connection, subscription to…
Browse files Browse the repository at this point in the history
… avoid misjudgment of not found error (#19640)
  • Loading branch information
yezizp2012 authored Dec 3, 2024
1 parent e9c15eb commit 2c0d286
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 60 deletions.
35 changes: 21 additions & 14 deletions src/frontend/src/binder/expr/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ impl Binder {
over,
}: Function,
) -> Result<ExprImpl> {
let func_name = match name.0.as_slice() {
[name] => name.real_value(),
let (schema_name, func_name) = match name.0.as_slice() {
[name] => (None, name.real_value()),
[schema, name] => {
let schema_name = schema.real_value();
if schema_name == PG_CATALOG_SCHEMA_NAME {
let func_name = if schema_name == PG_CATALOG_SCHEMA_NAME {
// pg_catalog is always effectively part of the search path, so we can always bind the function.
// Ref: https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-CATALOG
name.real_value()
Expand All @@ -109,7 +109,8 @@ impl Binder {
"Unsupported function name under schema: {}",
schema_name
);
}
};
(Some(schema_name), func_name)
}
_ => bail_not_implemented!(issue = 112, "qualified function {}", name),
};
Expand Down Expand Up @@ -166,9 +167,13 @@ impl Binder {
InputRef::new(i, DataType::List(Box::new(expr.return_type()))).into()
})
.collect_vec();
let scalar_func_expr = if let Ok(schema) = self.first_valid_schema()
&& let Some(func) = schema.get_function_by_name_inputs(&func_name, &mut array_args)
{
let schema_path = self.bind_schema_path(schema_name.as_deref());
let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
&self.db_name,
schema_path,
&func_name,
&mut array_args,
) {
// record the dependency upon the UDF
referred_udfs.insert(func.id);

Expand All @@ -194,12 +199,14 @@ impl Binder {
None
};

let schema_path = self.bind_schema_path(schema_name.as_deref());
let udf = if wrapped_agg_type.is_none()
&& let Ok(schema) = self.first_valid_schema()
&& let Some(func) = schema
.get_function_by_name_inputs(&func_name, &mut args)
.cloned()
{
&& let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
&self.db_name,
schema_path,
&func_name,
&mut args,
) {
// record the dependency upon the UDF
referred_udfs.insert(func.id);

Expand Down Expand Up @@ -228,11 +235,11 @@ impl Binder {
over.is_some(),
format!("`OVER` is not allowed in {} call", name)
);
return self.bind_sql_udf(func, args);
return self.bind_sql_udf(func.clone(), args);
}

// now `func` is a non-SQL user-defined scalar/aggregate/table function
Some(func)
Some(func.clone())
} else {
None
};
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub use update::{BoundUpdate, UpdateProject};
pub use values::BoundValues;

use crate::catalog::catalog_service::CatalogReadGuard;
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::{CatalogResult, TableId, ViewId};
use crate::error::ErrorCode;
Expand Down Expand Up @@ -506,6 +507,10 @@ impl Binder {
)
}

fn bind_schema_path<'a>(&'a self, schema_name: Option<&'a str>) -> SchemaPath<'a> {
SchemaPath::new(schema_name, &self.search_path, &self.auth_context.user_name)
}

pub fn set_clause(&mut self, clause: Option<Clause>) {
self.context.clause = clause;
}
Expand Down
10 changes: 2 additions & 8 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,7 @@ impl Binder {
alias: Option<TableAlias>,
) -> Result<BoundBaseTable> {
let db_name = &self.db_name;
let schema_path = match schema_name {
Some(schema_name) => SchemaPath::Name(schema_name),
None => SchemaPath::Path(&self.search_path, &self.auth_context.user_name),
};
let schema_path = self.bind_schema_path(schema_name);
let (table_catalog, schema_name) =
self.catalog
.get_created_table_by_name(db_name, schema_path, table_name)?;
Expand Down Expand Up @@ -358,10 +355,7 @@ impl Binder {
is_insert: bool,
) -> Result<&'a TableCatalog> {
let db_name = &self.db_name;
let schema_path = match schema_name {
Some(schema_name) => SchemaPath::Name(schema_name),
None => SchemaPath::Path(&self.search_path, &self.auth_context.user_name),
};
let schema_path = self.bind_schema_path(schema_name);

let (table, _schema_name) =
self.catalog
Expand Down
47 changes: 9 additions & 38 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ use self::cursor_manager::CursorManager;
use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
use crate::catalog::connection_catalog::ConnectionCatalog;
use crate::catalog::root_catalog::Catalog;
use crate::catalog::root_catalog::{Catalog, SchemaPath};
use crate::catalog::secret_catalog::SecretCatalog;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::subscription_catalog::SubscriptionCatalog;
Expand Down Expand Up @@ -987,19 +987,9 @@ impl SessionImpl {
let user_name = &self.auth_context().user_name;

let catalog_reader = self.env().catalog_reader().read_guard();
let schema = match schema_name {
Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
};
let schema = catalog_reader.get_schema_by_name(db_name, schema.name().as_str())?;
let connection = schema
.get_connection_by_name(connection_name)
.ok_or_else(|| {
RwError::from(ErrorCode::ItemNotFound(format!(
"connection {} not found",
connection_name
)))
})?;
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
let (connection, _) =
catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
Ok(connection.clone())
}

Expand Down Expand Up @@ -1034,19 +1024,9 @@ impl SessionImpl {
let user_name = &self.auth_context().user_name;

let catalog_reader = self.env().catalog_reader().read_guard();
let schema = match schema_name {
Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
};
let schema = catalog_reader.get_schema_by_name(db_name, schema.name().as_str())?;
let subscription = schema
.get_subscription_by_name(subscription_name)
.ok_or_else(|| {
RwError::from(ErrorCode::ItemNotFound(format!(
"subscription {} not found",
subscription_name
)))
})?;
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
let (subscription, _) =
catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
Ok(subscription.clone())
}

Expand Down Expand Up @@ -1084,17 +1064,8 @@ impl SessionImpl {
let user_name = &self.auth_context().user_name;

let catalog_reader = self.env().catalog_reader().read_guard();
let schema = match schema_name {
Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
};
let schema = catalog_reader.get_schema_by_name(db_name, schema.name().as_str())?;
let secret = schema.get_secret_by_name(secret_name).ok_or_else(|| {
RwError::from(ErrorCode::ItemNotFound(format!(
"secret {} not found",
secret_name
)))
})?;
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
Ok(secret.clone())
}

Expand Down

0 comments on commit 2c0d286

Please sign in to comment.