Skip to content

Commit

Permalink
revert:feat(frontend): support alter source pause/resume (#19636) (#2…
Browse files Browse the repository at this point in the history
…0126) (#20137)

Signed-off-by: MrCroxx <[email protected]>
Co-authored-by: Croxx <[email protected]>
  • Loading branch information
github-actions[bot] and MrCroxx authored Jan 13, 2025
1 parent c9b3652 commit c7f206d
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 183 deletions.

This file was deleted.

24 changes: 5 additions & 19 deletions src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
use risingwave_sqlparser::ast::ObjectName;
use risingwave_sqlparser::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED};

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::ErrorCode::InvalidInputSyntax;
use crate::error::{ErrorCode, Result};
use crate::Binder;

Expand Down Expand Up @@ -58,17 +56,6 @@ pub async fn handle_alter_streaming_rate_limit(
let reader = session.env().catalog_reader().read_guard();
let (source, schema_name) =
reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
if let Some(prev_limit) = source.rate_limit {
if rate_limit == SOURCE_RATE_LIMIT_PAUSED
|| (prev_limit != 0 && rate_limit == SOURCE_RATE_LIMIT_RESUMED)
{
return Err(InvalidInputSyntax(
"PAUSE or RESUME is invalid when the stream has pre configured ratelimit."
.to_owned(),
)
.into());
}
}
session.check_privilege_for_drop_alter(schema_name, &**source)?;
(StatementType::ALTER_SOURCE, source.id)
}
Expand Down Expand Up @@ -103,7 +90,10 @@ pub async fn handle_alter_streaming_rate_limit(
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
if table.table_type != TableType::Table {
return Err(InvalidInputSyntax(format!("\"{table_name}\" is not a table",)).into());
return Err(ErrorCode::InvalidInputSyntax(format!(
"\"{table_name}\" is not a table",
))
.into());
}
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_TABLE, table.id.table_id)
Expand All @@ -114,11 +104,7 @@ pub async fn handle_alter_streaming_rate_limit(
let meta_client = session.env().meta_client();

let rate_limit = if rate_limit < 0 {
if rate_limit == SOURCE_RATE_LIMIT_PAUSED {
Some(0)
} else {
None
}
None
} else {
Some(rate_limit as u32)
};
Expand Down
9 changes: 3 additions & 6 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::ast::{
display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SecretRefValue,
SetVariableValue, Value,
};
use crate::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED};
use crate::tokenizer::Token;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -466,11 +465,9 @@ impl fmt::Display for AlterSourceOperation {
AlterSourceOperation::RefreshSchema => {
write!(f, "REFRESH SCHEMA")
}
AlterSourceOperation::SetSourceRateLimit { rate_limit } => match *rate_limit {
SOURCE_RATE_LIMIT_PAUSED => write!(f, "PAUSE"),
SOURCE_RATE_LIMIT_RESUMED => write!(f, "RESUME"),
_ => write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit),
},
AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
}
AlterSourceOperation::SwapRenameSource { target_source } => {
write!(f, "SWAP WITH {}", target_source)
}
Expand Down
2 changes: 0 additions & 2 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ define_keywords!(
PARTITIONED,
PARTITIONS,
PASSWORD,
PAUSE,
PERCENT,
PERCENTILE_CONT,
PERCENTILE_DISC,
Expand Down Expand Up @@ -435,7 +434,6 @@ define_keywords!(
REPLACE,
RESTRICT,
RESULT,
RESUME,
RETURN,
RETURNING,
RETURNS,
Expand Down
14 changes: 1 addition & 13 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ use crate::{impl_parse_to, parser_v2};

pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector";
pub(crate) const WEBHOOK_CONNECTOR: &str = "webhook";
// reserve i32::MIN for pause.
pub const SOURCE_RATE_LIMIT_PAUSED: i32 = i32::MIN;
// reserve i32::MIN + 1 for resume.
pub const SOURCE_RATE_LIMIT_RESUMED: i32 = i32::MIN + 1;

#[derive(Debug, Clone, PartialEq)]
pub enum ParserError {
Expand Down Expand Up @@ -3559,17 +3555,9 @@ impl Parser<'_> {
} else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) {
let target_source = self.parse_object_name()?;
AlterSourceOperation::SwapRenameSource { target_source }
} else if self.parse_keyword(Keyword::PAUSE) {
AlterSourceOperation::SetSourceRateLimit {
rate_limit: SOURCE_RATE_LIMIT_PAUSED,
}
} else if self.parse_keyword(Keyword::RESUME) {
AlterSourceOperation::SetSourceRateLimit {
rate_limit: SOURCE_RATE_LIMIT_RESUMED,
}
} else {
return self.expected(
"RENAME, ADD COLUMN, OWNER TO, SET, PAUSE, RESUME, or SOURCE_RATE_LIMIT after ALTER SOURCE",
"RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE",
);
};

Expand Down

0 comments on commit c7f206d

Please sign in to comment.