diff --git a/e2e_test/source_inline/kafka/alter/resume_pause_source_kafka.slt.serial b/e2e_test/source_inline/kafka/alter/resume_pause_source_kafka.slt.serial deleted file mode 100644 index 0922b4e56eee3..0000000000000 --- a/e2e_test/source_inline/kafka/alter/resume_pause_source_kafka.slt.serial +++ /dev/null @@ -1,143 +0,0 @@ -control substitution on - -statement ok -SET streaming_use_shared_source TO false; - -############## Create kafka seed data - -statement ok -create table kafka_seed_data (v1 int); - -statement ok -insert into kafka_seed_data select * from generate_series(1, 1000); - -############## Sink into kafka - -statement ok -create sink kafka_sink -from - kafka_seed_data with ( - ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, - topic = 'test_rate_limit', - type = 'append-only', - force_append_only='true' -); - -############## Source from kafka (rate_limit = 0) - -# Wait for the topic to create -skipif in-memory -sleep 5s - -statement ok -create source kafka_source (v1 int) with ( - ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, - topic = 'test_rate_limit', - scan.startup.mode = 'earliest', -) FORMAT PLAIN ENCODE JSON - - -statement ok -create source kafka_source2 (v1 int) with ( - ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, - topic = 'test_rate_limit', - scan.startup.mode = 'earliest', - source_rate_limit = 100, -) FORMAT PLAIN ENCODE JSON - -statement ok -flush; - -############## Check data - -skipif in-memory -sleep 3s - -############## Create MV on source - -# This should be ignored. -statement ok -SET SOURCE_RATE_LIMIT=1000; - -statement ok -create materialized view rl_mv1 as select count(*) from kafka_source; - -statement ok -create materialized view rl_mv2 as select count(*) from kafka_source; - -statement ok -create materialized view rl_mv3 as select count(*) from kafka_source; - -skipif in-memory -statement count 0 -alter source kafka_source pause; - -skipif in-memory -statement error -alter source kafka_source2 pause; - -query T -select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id -order by name; ----- -rl_mv1 SOURCE {SOURCE} 0 -rl_mv2 SOURCE {SOURCE} 0 -rl_mv3 SOURCE {SOURCE} 0 - - -skipif in-memory -statement count 0 -alter source kafka_source resume; - -# rate limit becomes None -query T -select count(*) from rw_rate_limit; ----- -0 - -skipif in-memory -sleep 3s - -skipif in-memory -query I -select count(*) > 0 from rl_mv1; ----- -t - -skipif in-memory -query I -select count(*) > 0 from rl_mv2; ----- -t - -skipif in-memory -query I -select count(*) > 0 from rl_mv3; ----- -t - -############## Cleanup - -statement ok -drop materialized view rl_mv1; - -statement ok -drop materialized view rl_mv2; - -statement ok -drop materialized view rl_mv3; - -statement ok -drop source kafka_source; - -statement ok -drop source kafka_source2; - -statement ok -drop sink kafka_sink; - -statement ok -drop table kafka_seed_data; - -statement ok -SET streaming_use_shared_source TO true; diff --git a/src/frontend/src/handler/alter_streaming_rate_limit.rs b/src/frontend/src/handler/alter_streaming_rate_limit.rs index d41104f87d488..57d6ef0bce514 100644 --- a/src/frontend/src/handler/alter_streaming_rate_limit.rs +++ b/src/frontend/src/handler/alter_streaming_rate_limit.rs @@ -16,12 +16,10 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail; use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget; use risingwave_sqlparser::ast::ObjectName; -use risingwave_sqlparser::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED}; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::table_catalog::TableType; -use crate::error::ErrorCode::InvalidInputSyntax; use crate::error::{ErrorCode, Result}; use crate::Binder; @@ -58,17 +56,6 @@ pub async fn handle_alter_streaming_rate_limit( let reader = session.env().catalog_reader().read_guard(); let (source, schema_name) = reader.get_source_by_name(db_name, schema_path, &real_table_name)?; - if let Some(prev_limit) = source.rate_limit { - if rate_limit == SOURCE_RATE_LIMIT_PAUSED - || (prev_limit != 0 && rate_limit == SOURCE_RATE_LIMIT_RESUMED) - { - return Err(InvalidInputSyntax( - "PAUSE or RESUME is invalid when the stream has pre configured ratelimit." - .to_owned(), - ) - .into()); - } - } session.check_privilege_for_drop_alter(schema_name, &**source)?; (StatementType::ALTER_SOURCE, source.id) } @@ -103,7 +90,10 @@ pub async fn handle_alter_streaming_rate_limit( let (table, schema_name) = reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?; if table.table_type != TableType::Table { - return Err(InvalidInputSyntax(format!("\"{table_name}\" is not a table",)).into()); + return Err(ErrorCode::InvalidInputSyntax(format!( + "\"{table_name}\" is not a table", + )) + .into()); } session.check_privilege_for_drop_alter(schema_name, &**table)?; (StatementType::ALTER_TABLE, table.id.table_id) @@ -114,11 +104,7 @@ pub async fn handle_alter_streaming_rate_limit( let meta_client = session.env().meta_client(); let rate_limit = if rate_limit < 0 { - if rate_limit == SOURCE_RATE_LIMIT_PAUSED { - Some(0) - } else { - None - } + None } else { Some(rate_limit as u32) }; diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index a17dc9297089b..1453df62d0fee 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -25,7 +25,6 @@ use crate::ast::{ display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SecretRefValue, SetVariableValue, Value, }; -use crate::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED}; use crate::tokenizer::Token; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -466,11 +465,9 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") } - AlterSourceOperation::SetSourceRateLimit { rate_limit } => match *rate_limit { - SOURCE_RATE_LIMIT_PAUSED => write!(f, "PAUSE"), - SOURCE_RATE_LIMIT_RESUMED => write!(f, "RESUME"), - _ => write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit), - }, + AlterSourceOperation::SetSourceRateLimit { rate_limit } => { + write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) + } AlterSourceOperation::SwapRenameSource { target_source } => { write!(f, "SWAP WITH {}", target_source) } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 84f3d0eb42a68..4f9178ed5dc5e 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -385,7 +385,6 @@ define_keywords!( PARTITIONED, PARTITIONS, PASSWORD, - PAUSE, PERCENT, PERCENTILE_CONT, PERCENTILE_DISC, @@ -435,7 +434,6 @@ define_keywords!( REPLACE, RESTRICT, RESULT, - RESUME, RETURN, RETURNING, RETURNS, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index f9ee9f7275433..06be8bc1cb137 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -39,10 +39,6 @@ use crate::{impl_parse_to, parser_v2}; pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; pub(crate) const WEBHOOK_CONNECTOR: &str = "webhook"; -// reserve i32::MIN for pause. -pub const SOURCE_RATE_LIMIT_PAUSED: i32 = i32::MIN; -// reserve i32::MIN + 1 for resume. -pub const SOURCE_RATE_LIMIT_RESUMED: i32 = i32::MIN + 1; #[derive(Debug, Clone, PartialEq)] pub enum ParserError { @@ -3559,17 +3555,9 @@ impl Parser<'_> { } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { let target_source = self.parse_object_name()?; AlterSourceOperation::SwapRenameSource { target_source } - } else if self.parse_keyword(Keyword::PAUSE) { - AlterSourceOperation::SetSourceRateLimit { - rate_limit: SOURCE_RATE_LIMIT_PAUSED, - } - } else if self.parse_keyword(Keyword::RESUME) { - AlterSourceOperation::SetSourceRateLimit { - rate_limit: SOURCE_RATE_LIMIT_RESUMED, - } } else { return self.expected( - "RENAME, ADD COLUMN, OWNER TO, SET, PAUSE, RESUME, or SOURCE_RATE_LIMIT after ALTER SOURCE", + "RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE", ); };