Skip to content

Commit

Permalink
Merge branch 'main' into xxh/fix-iceberg-source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Jan 21, 2025
2 parents e8e4e0f + c0bea54 commit 3a179ca
Show file tree
Hide file tree
Showing 27 changed files with 439 additions and 179 deletions.
4 changes: 4 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,10 @@ if [[ $ENABLE_COREDUMP == "true" ]]; then
ulimit -c unlimited
fi
if [[ $ENABLE_BUILD_RW_CONNECTOR == "true" ]]; then
export CONNECTOR_LIBS_PATH="${PREFIX_BIN}/connector-node/libs/"
fi
set -x
target/${RISEDEV_BUILD_TARGET_DIR}${BUILD_MODE_DIR}/risingwave playground $@
'''
Expand Down
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ message ExprNode {
PG_IS_IN_RECOVERY = 2411;
RW_RECOVERY_STATUS = 2412;
RW_EPOCH_TO_TS = 2413;
PG_TABLE_IS_VISIBLE = 2414;

// EXTERNAL
ICEBERG_TRANSFORM = 2201;
Expand Down
12 changes: 7 additions & 5 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,18 @@ message ListTableFragmentsResponse {
map<uint32, TableFragmentInfo> table_fragments = 1;
}

message ListTableFragmentStatesRequest {}
message ListStreamingJobStatesRequest {}

message ListTableFragmentStatesResponse {
message TableFragmentState {
message ListStreamingJobStatesResponse {
message StreamingJobState {
uint32 table_id = 1;
TableFragments.State state = 2;
TableParallelism parallelism = 3;
uint32 max_parallelism = 4;
string name = 5;
string resource_group = 6;
}
repeated TableFragmentState states = 1;
repeated StreamingJobState states = 1;
}

message ListFragmentDistributionRequest {}
Expand Down Expand Up @@ -333,7 +335,7 @@ service StreamManagerService {
rpc Resume(ResumeRequest) returns (ResumeResponse);
rpc CancelCreatingJobs(CancelCreatingJobsRequest) returns (CancelCreatingJobsResponse);
rpc ListTableFragments(ListTableFragmentsRequest) returns (ListTableFragmentsResponse);
rpc ListTableFragmentStates(ListTableFragmentStatesRequest) returns (ListTableFragmentStatesResponse);
rpc ListStreamingJobStates(ListStreamingJobStatesRequest) returns (ListStreamingJobStatesResponse);
rpc ListFragmentDistribution(ListFragmentDistributionRequest) returns (ListFragmentDistributionResponse);
rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse);
rpc ListActorSplits(ListActorSplitsRequest) returns (ListActorSplitsResponse);
Expand Down
26 changes: 15 additions & 11 deletions src/frontend/planner_test/tests/testdata/output/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@
ORDER BY 1,2;
logical_plan: |-
LogicalProject { exprs: [rw_schemas.name, rw_tables.name, Case(($expr1 = 'r':Varchar), 'table':Varchar, ($expr1 = 'v':Varchar), 'view':Varchar, ($expr1 = 'm':Varchar), 'materialized view':Varchar, ($expr1 = 'i':Varchar), 'index':Varchar, ($expr1 = 'S':Varchar), 'sequence':Varchar, ($expr1 = 's':Varchar), 'special':Varchar, ($expr1 = 't':Varchar), 'TOAST table':Varchar, ($expr1 = 'f':Varchar), 'foreign table':Varchar, ($expr1 = 'p':Varchar), 'partitioned table':Varchar, ($expr1 = 'I':Varchar), 'partitioned index':Varchar) as $expr4, PgGetUserbyid(rw_tables.owner) as $expr5] }
└─LogicalFilter { predicate: In($expr1, 'r':Varchar, 'p':Varchar, 'v':Varchar, 'm':Varchar, 'S':Varchar, 'f':Varchar, '':Varchar) AND (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) }
└─LogicalFilter { predicate: In($expr1, 'r':Varchar, 'p':Varchar, 'v':Varchar, 'm':Varchar, 'S':Varchar, 'f':Varchar, '':Varchar) AND (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) AND PgTableIsVisible(rw_tables.id) }
└─LogicalJoin { type: LeftOuter, on: (rw_schemas.id = rw_tables.schema_id), output: all }
├─LogicalShare { id: 18 }
│ └─LogicalProject { exprs: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, 'p':Varchar, Case(('table':Varchar = 'table':Varchar), 'r':Varchar, ('table':Varchar = 'system table':Varchar), 'r':Varchar, ('table':Varchar = 'index':Varchar), 'i':Varchar, ('table':Varchar = 'view':Varchar), 'v':Varchar, ('table':Varchar = 'materialized view':Varchar), 'm':Varchar) as $expr1, 0:Int32::Int16 as $expr2, 0:Int32, 0:Int32, Array as $expr3, false:Boolean, null:Varchar] }
Expand Down Expand Up @@ -264,24 +264,28 @@
├─BatchExchange { order: [], dist: HashShard(rw_tables.schema_id) }
│ └─BatchUnion { all: true }
│ ├─BatchProject { exprs: [rw_tables.name, 'table':Varchar, rw_tables.schema_id, rw_tables.owner] }
│ │ └─BatchScan { table: rw_tables, columns: [rw_tables.name, rw_tables.schema_id, rw_tables.owner], distribution: Single }
│ │ └─BatchFilter { predicate: PgTableIsVisible(rw_tables.id) }
│ │ └─BatchScan { table: rw_tables, columns: [rw_tables.name, rw_tables.schema_id, rw_tables.owner, rw_tables.id], distribution: Single }
│ ├─BatchProject { exprs: [rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner] }
│ │ └─BatchScan { table: rw_system_tables, columns: [rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner], distribution: Single }
│ │ └─BatchFilter { predicate: PgTableIsVisible(rw_system_tables.id) }
│ │ └─BatchScan { table: rw_system_tables, columns: [rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.id], distribution: Single }
│ ├─BatchProject { exprs: [rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner] }
│ │ └─BatchFilter { predicate: null:Boolean }
│ │ └─BatchScan { table: rw_sources, columns: [rw_sources.name, rw_sources.schema_id, rw_sources.owner], distribution: Single }
│ │ └─BatchFilter { predicate: null:Boolean AND PgTableIsVisible(rw_sources.id) }
│ │ └─BatchScan { table: rw_sources, columns: [rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.id], distribution: Single }
│ ├─BatchProject { exprs: [rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner] }
│ │ └─BatchValues { rows: [] }
│ ├─BatchProject { exprs: [rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner] }
│ │ └─BatchFilter { predicate: null:Boolean }
│ │ └─BatchScan { table: rw_sinks, columns: [rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner], distribution: Single }
│ │ └─BatchFilter { predicate: null:Boolean AND PgTableIsVisible(rw_sinks.id) }
│ │ └─BatchScan { table: rw_sinks, columns: [rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.id], distribution: Single }
│ ├─BatchProject { exprs: [rw_subscriptions.name, 'subscription':Varchar, rw_subscriptions.schema_id, rw_subscriptions.owner] }
│ │ └─BatchFilter { predicate: null:Boolean }
│ │ └─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.name, rw_subscriptions.schema_id, rw_subscriptions.owner], distribution: Single }
│ │ └─BatchFilter { predicate: null:Boolean AND PgTableIsVisible(rw_subscriptions.id) }
│ │ └─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.name, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.id], distribution: Single }
│ ├─BatchProject { exprs: [rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner] }
│ │ └─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner], distribution: Single }
│ │ └─BatchFilter { predicate: PgTableIsVisible(rw_materialized_views.id) }
│ │ └─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.id], distribution: Single }
│ └─BatchProject { exprs: [rw_views.name, 'view':Varchar, rw_views.schema_id, rw_views.owner] }
│ └─BatchScan { table: rw_views, columns: [rw_views.name, rw_views.schema_id, rw_views.owner], distribution: Single }
│ └─BatchFilter { predicate: PgTableIsVisible(rw_views.id) }
│ └─BatchScan { table: rw_views, columns: [rw_views.name, rw_views.schema_id, rw_views.owner, rw_views.id], distribution: Single }
└─BatchExchange { order: [], dist: HashShard(rw_schemas.id) }
└─BatchFilter { predicate: (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) }
└─BatchScan { table: rw_schemas, columns: [rw_schemas.id, rw_schemas.name], distribution: Single }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/function/builtin_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ impl Binder {
Ok(ExprImpl::literal_varchar(new_value.to_string()))
}))),
("format_type", raw_call(ExprType::FormatType)),
("pg_table_is_visible", raw_literal(ExprImpl::literal_bool(true))),
("pg_table_is_visible", raw_call(ExprType::PgTableIsVisible)),
("pg_type_is_visible", raw_literal(ExprImpl::literal_bool(true))),
("pg_get_constraintdef", raw_literal(ExprImpl::literal_null(DataType::Varchar))),
("pg_get_partkeydef", raw_literal(ExprImpl::literal_null(DataType::Varchar))),
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,17 @@ impl SchemaCatalog {
}
}

pub fn contains_object(&self, oid: u32) -> bool {
self.table_by_id.contains_key(&TableId::new(oid))
|| self.index_by_id.contains_key(&IndexId::new(oid))
|| self.source_by_id.contains_key(&oid)
|| self.sink_by_id.contains_key(&oid)
|| self.view_by_id.contains_key(&oid)
|| self.function_by_id.contains_key(&FunctionId::new(oid))
|| self.subscription_by_id.contains_key(&oid)
|| self.connection_by_id.contains_key(&oid)
}

pub fn id(&self) -> SchemaId {
self.id
}
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use risingwave_common::error::BoxedError;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::types::DataType;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
use risingwave_pb::meta::table_parallelism::{PbFixedParallelism, PbParallelism};
use risingwave_pb::user::grant_privilege::Object;

Expand Down Expand Up @@ -216,7 +216,7 @@ pub fn infer_dummy_view_sql(columns: &[SystemCatalogColumnsDef<'_>]) -> String {
)
}

fn extract_parallelism_from_table_state(state: &TableFragmentState) -> String {
fn extract_parallelism_from_table_state(state: &StreamingJobState) -> String {
match state
.parallelism
.as_ref()
Expand Down Expand Up @@ -314,6 +314,10 @@ pub fn get_sys_views_in_schema(schema_name: &str) -> Vec<Arc<ViewCatalog>> {
.collect()
}

pub fn is_system_catalog(oid: u32) -> bool {
oid >= SYS_CATALOG_START_ID as u32
}

/// The global registry of all builtin catalogs.
pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ mod rw_actor_id_to_ddl;
mod rw_actor_splits;
mod rw_fragment_id_to_ddl;
mod rw_internal_table_info;
mod rw_streaming_jobs;
mod rw_worker_actor_count;
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct RwActorInfo {
async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorInfo>> {
let table_ids = reader
.meta_client
.list_table_fragment_states()
.list_streaming_job_states()
.await?
.into_iter()
.map(|fragment| fragment.table_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

use crate::catalog::system_catalog::{extract_parallelism_from_table_state, SysCatalogReaderImpl};
use crate::error::Result;

#[derive(Fields)]
struct RwStreamingJob {
#[primary_key]
job: i32,
name: String,
status: String,
parallelism: String,
max_parallelism: i32,
resource_group: String,
}

#[system_catalog(table, "rw_catalog.rw_streaming_jobs")]
async fn read_rw_streaming_jobs(reader: &SysCatalogReaderImpl) -> Result<Vec<RwStreamingJob>> {
let states = reader.meta_client.list_streaming_job_states().await?;

Ok(states
.into_iter()
.map(|state| {
let parallelism = extract_parallelism_from_table_state(&state);
RwStreamingJob {
job: state.table_id as i32,
status: state.state().as_str_name().into(),
name: state.name,
parallelism: parallelism.to_uppercase(),
max_parallelism: state.max_parallelism as i32,
resource_group: state.resource_group,
}
})
.collect())
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct RwTableFragment {
async fn read_rw_table_fragments_info(
reader: &SysCatalogReaderImpl,
) -> Result<Vec<RwTableFragment>> {
let states = reader.meta_client.list_table_fragment_states().await?;
let states = reader.meta_client.list_streaming_job_states().await?;

Ok(states
.into_iter()
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/expr/function_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ mod pg_get_viewdef;
mod pg_index_column_has_property;
mod pg_indexes_size;
mod pg_relation_size;
mod pg_table_is_visible;
mod rw_epoch_to_ts;
mod rw_recovery_status;
72 changes: 72 additions & 0 deletions src/frontend/src/expr/function_impl/pg_table_is_visible.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::acl::AclMode;
use risingwave_common::session_config::SearchPath;
use risingwave_expr::{capture_context, function, Result};
use risingwave_pb::user::grant_privilege::Object as GrantObject;

use super::context::{AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, USER_INFO_READER};
use crate::catalog::system_catalog::is_system_catalog;
use crate::catalog::CatalogReader;
use crate::expr::function_impl::has_privilege::user_not_found_err;
use crate::session::AuthContext;
use crate::user::user_service::UserInfoReader;

#[function("pg_table_is_visible(int4) -> boolean")]
fn pg_table_is_visible(oid: i32) -> Result<Option<bool>> {
pg_table_is_visible_impl_captured(oid)
}

#[capture_context(CATALOG_READER, USER_INFO_READER, AUTH_CONTEXT, SEARCH_PATH, DB_NAME)]
fn pg_table_is_visible_impl(
catalog: &CatalogReader,
user_info: &UserInfoReader,
auth_context: &AuthContext,
search_path: &SearchPath,
db_name: &str,
oid: i32,
) -> Result<Option<bool>> {
// To maintain consistency with PostgreSQL, we ensure that system catalogs are always visible.
if is_system_catalog(oid as u32) {
return Ok(Some(true));
}

let catalog_reader = catalog.read_guard();
let user_reader = user_info.read_guard();
let user_info = user_reader
.get_user_by_name(&auth_context.user_name)
.ok_or(user_not_found_err(
format!("User {} not found", auth_context.user_name).as_str(),
))?;
// Return true only if:
// 1. The schema of the object exists in the search path.
// 2. User have `USAGE` privilege on the schema.
for schema in search_path.path() {
if let Ok(schema) = catalog_reader.get_schema_by_name(db_name, schema) {
if schema.contains_object(oid as u32) {
return if user_info.is_super
|| user_info
.check_privilege(&GrantObject::SchemaId(schema.id()), AclMode::Usage)
{
Ok(Some(true))
} else {
Ok(Some(false))
};
}
}
}

Ok(None)
}
3 changes: 2 additions & 1 deletion src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ impl ExprVisitor for ImpureAnalyzer {
| Type::HasSchemaPrivilege
| Type::MakeTimestamptz
| Type::PgIsInRecovery
| Type::RwRecoveryStatus => self.impure = true,
| Type::RwRecoveryStatus
| Type::PgTableIsVisible => self.impure = true,
}
}
}
Expand Down
Loading

0 comments on commit 3a179ca

Please sign in to comment.